使用取消令牌停止 Parallel.ForEach 并停止
Stopping Parallel.ForEach with Cancellation Token and Stop
我不确定我是否按我的意愿停止了 Parallel.ForEach
循环。
所以让我概述一下这个问题。
该循环使用了一个可用连接数有限的数据库驱动程序,并且需要跟踪打开的连接数,因此数据库驱动程序不会抛出异常。问题是跟踪打开的连接是手动实现的(这应该重构 - 编写包装器或使用 AutoResetEvent
但还有一些其他事情需要首先处理)。所以我需要跟踪打开的连接,尤其是我必须处理异常情况:
Parallel.ForEach(hugeLists, parallelOptions, currentList => {
WaitForDatabaseConnection();
try {
Interlocked.Increment(ref numOfOpenConnections);
DoDatabaseCallAndInsertions();
} catch (Exception ex) {
// logging
throw;
} finally {
Interlocked.Decrement(ref numOfOpenConnections);
}
}
这是没有取消的循环的简化版本。为了在出现异常时提高性能,在抛出异常时应尽快取消循环。如果一件事失败了,循环应该停止。
我怎样才能确保 numOfOpenConnections
得到正确更新?
到目前为止我尝试了什么(这是我想要的还是我遗漏了什么?):
Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {
parallelOptions.CancellationToken.ThrowIfCancellationRequested();
WaitForDatabaseConnection();
try {
Interlocked.Increment(ref numOfOpenConnections);
DoDatabaseCallAndInsertions();
} catch (Exception ex) {
// logging
cancellationTokenSource.Cancel();
parallelLoopState.Stop();
throw; // still want to preserve the original exception information
} finally {
Interlocked.Decrement(ref numOfOpenConnections);
}
}
我可以将此代码包装在 try - catch 构造中并捕获 AggregateException
.
您可以调用 DoDatabaseCallAndInsertions
方法,仅在循环状态不是 exceptional 时等待其完成,否则忘记它并允许并行循环完成立即地。使用可取消包装器可能是实现此目的的最简单方法。这是一个等待函数完成或 CancellationToken
被取消的方法 RunAsCancelable
,无论先到者是什么:
public static TResult RunAsCancelable<TResult>(Func<TResult> function,
CancellationToken token)
{
token.ThrowIfCancellationRequested();
Task<TResult> task = Task.Run(function, token);
try
{
// Wait for the function to complete, or the token to become canceled
task.Wait(token);
}
catch { } // Prevent an AggregateException to be thrown
token.ThrowIfCancellationRequested();
// Propagate the result, or the original exception unwrapped
return task.GetAwaiter().GetResult();
}
public static void RunAsCancelable(Action action, CancellationToken token)
=> RunAsCancelable<object>(() => { action(); return null; }, token);
如果令牌在 action
完成之前被取消,RunAsCancelable
方法将抛出 OperationCanceledException
,或者传播 action
中发生的异常,或者如果 action
成功完成,则成功完成。
用法示例:
using (var failureCTS = new CancellationTokenSource()) // Communicates failure
{
Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>
{
WaitForDatabaseConnection();
try
{
Interlocked.Increment(ref numOfOpenConnections);
RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),
failureCTS.Token);
}
catch (OperationCanceledException ex)
when (ex.CancellationToken == failureCTS.Token)
{
// Do nothing (an exception occurred in another thread)
}
catch (Exception ex)
{
Log.Error(ex);
failureCTS.Cancel(); // Signal failure to the other threads
throw; // Inform the parallel loop that an error has occurred
}
finally
{
Interlocked.Decrement(ref numOfOpenConnections);
}
});
}
DoDatabaseCallAndInsertions
方法可以在各个点检查CancellationToken
参数的属性IsCancellationRequested
,并在需要时执行事务回滚。
需要注意的是,RunAsCancelable
方法在使用 ThreadPool
线程方面相当浪费。必须阻塞一个额外的线程才能使每个提供的操作可取消,因此每次执行 lambda 都需要两个线程。为了防止 ThreadPool
可能出现饥饿,最好在切换到每 500 毫秒创建一个算法之前增加线程池按需创建的最小线程数,方法是使用ThreadPool.SetMinThreads
应用程序启动时的方法。
ThreadPool.SetMinThreads(100, 10);
重要提示:上述解决方案没有尝试记录可能被遗忘的操作异常。只会记录第一个失败操作的异常。
我不确定我是否按我的意愿停止了 Parallel.ForEach
循环。
所以让我概述一下这个问题。
该循环使用了一个可用连接数有限的数据库驱动程序,并且需要跟踪打开的连接数,因此数据库驱动程序不会抛出异常。问题是跟踪打开的连接是手动实现的(这应该重构 - 编写包装器或使用 AutoResetEvent
但还有一些其他事情需要首先处理)。所以我需要跟踪打开的连接,尤其是我必须处理异常情况:
Parallel.ForEach(hugeLists, parallelOptions, currentList => {
WaitForDatabaseConnection();
try {
Interlocked.Increment(ref numOfOpenConnections);
DoDatabaseCallAndInsertions();
} catch (Exception ex) {
// logging
throw;
} finally {
Interlocked.Decrement(ref numOfOpenConnections);
}
}
这是没有取消的循环的简化版本。为了在出现异常时提高性能,在抛出异常时应尽快取消循环。如果一件事失败了,循环应该停止。
我怎样才能确保 numOfOpenConnections
得到正确更新?
到目前为止我尝试了什么(这是我想要的还是我遗漏了什么?):
Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {
parallelOptions.CancellationToken.ThrowIfCancellationRequested();
WaitForDatabaseConnection();
try {
Interlocked.Increment(ref numOfOpenConnections);
DoDatabaseCallAndInsertions();
} catch (Exception ex) {
// logging
cancellationTokenSource.Cancel();
parallelLoopState.Stop();
throw; // still want to preserve the original exception information
} finally {
Interlocked.Decrement(ref numOfOpenConnections);
}
}
我可以将此代码包装在 try - catch 构造中并捕获 AggregateException
.
您可以调用 DoDatabaseCallAndInsertions
方法,仅在循环状态不是 exceptional 时等待其完成,否则忘记它并允许并行循环完成立即地。使用可取消包装器可能是实现此目的的最简单方法。这是一个等待函数完成或 CancellationToken
被取消的方法 RunAsCancelable
,无论先到者是什么:
public static TResult RunAsCancelable<TResult>(Func<TResult> function,
CancellationToken token)
{
token.ThrowIfCancellationRequested();
Task<TResult> task = Task.Run(function, token);
try
{
// Wait for the function to complete, or the token to become canceled
task.Wait(token);
}
catch { } // Prevent an AggregateException to be thrown
token.ThrowIfCancellationRequested();
// Propagate the result, or the original exception unwrapped
return task.GetAwaiter().GetResult();
}
public static void RunAsCancelable(Action action, CancellationToken token)
=> RunAsCancelable<object>(() => { action(); return null; }, token);
如果令牌在 action
完成之前被取消,RunAsCancelable
方法将抛出 OperationCanceledException
,或者传播 action
中发生的异常,或者如果 action
成功完成,则成功完成。
用法示例:
using (var failureCTS = new CancellationTokenSource()) // Communicates failure
{
Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>
{
WaitForDatabaseConnection();
try
{
Interlocked.Increment(ref numOfOpenConnections);
RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),
failureCTS.Token);
}
catch (OperationCanceledException ex)
when (ex.CancellationToken == failureCTS.Token)
{
// Do nothing (an exception occurred in another thread)
}
catch (Exception ex)
{
Log.Error(ex);
failureCTS.Cancel(); // Signal failure to the other threads
throw; // Inform the parallel loop that an error has occurred
}
finally
{
Interlocked.Decrement(ref numOfOpenConnections);
}
});
}
DoDatabaseCallAndInsertions
方法可以在各个点检查CancellationToken
参数的属性IsCancellationRequested
,并在需要时执行事务回滚。
需要注意的是,RunAsCancelable
方法在使用 ThreadPool
线程方面相当浪费。必须阻塞一个额外的线程才能使每个提供的操作可取消,因此每次执行 lambda 都需要两个线程。为了防止 ThreadPool
可能出现饥饿,最好在切换到每 500 毫秒创建一个算法之前增加线程池按需创建的最小线程数,方法是使用ThreadPool.SetMinThreads
应用程序启动时的方法。
ThreadPool.SetMinThreads(100, 10);
重要提示:上述解决方案没有尝试记录可能被遗忘的操作异常。只会记录第一个失败操作的异常。