使用取消令牌停止 Parallel.ForEach 并停止

Stopping Parallel.ForEach with Cancellation Token and Stop

我不确定我是否按我的意愿停止了 Parallel.ForEach 循环。 所以让我概述一下这个问题。

该循环使用了一个可用连接数有限的数据库驱动程序,并且需要跟踪打开的连接数,因此数据库驱动程序不会抛出异常。问题是跟踪打开的连接是手动实现的(这应该重构 - 编写包装器或使用 AutoResetEvent 但还有一些其他事情需要首先处理)。所以我需要跟踪打开的连接,尤其是我必须处理异常情况:

Parallel.ForEach(hugeLists, parallelOptions, currentList => {
  WaitForDatabaseConnection();
  try {
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     throw;
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

这是没有取消的循环的简化版本。为了在出现异常时提高性能,在抛出异常时应尽快取消循环。如果一件事失败了,循环应该停止。

我怎样才能确保 numOfOpenConnections 得到正确更新?

到目前为止我尝试了什么(这是我想要的还是我遗漏了什么?):

Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {
  parallelOptions.CancellationToken.ThrowIfCancellationRequested();
  WaitForDatabaseConnection();
  try {     
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     cancellationTokenSource.Cancel();  
     parallelLoopState.Stop();
     throw; // still want to preserve the original exception information
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

我可以将此代码包装在 try - catch 构造中并捕获 AggregateException.

您可以调用 DoDatabaseCallAndInsertions 方法,仅在循环状态不是 exceptional 时等待其完成,否则忘记它并允许并行循环完成立即地。使用可取消包装器可能是实现此目的的最简单方法。这是一个等待函数完成或 CancellationToken 被取消的方法 RunAsCancelable,无论先到者是什么:

public static TResult RunAsCancelable<TResult>(Func<TResult> function,
    CancellationToken token)
{
    token.ThrowIfCancellationRequested();
    Task<TResult> task = Task.Run(function, token);
    try
    {
        // Wait for the function to complete, or the token to become canceled
        task.Wait(token);
    }
    catch { } // Prevent an AggregateException to be thrown

    token.ThrowIfCancellationRequested();
    // Propagate the result, or the original exception unwrapped
    return task.GetAwaiter().GetResult();
}

public static void RunAsCancelable(Action action, CancellationToken token)
    => RunAsCancelable<object>(() => { action(); return null; }, token);

如果令牌在 action 完成之前被取消,RunAsCancelable 方法将抛出 OperationCanceledException,或者传播 action 中发生的异常,或者如果 action 成功完成,则成功完成。

用法示例:

using (var failureCTS = new CancellationTokenSource()) // Communicates failure
{
    Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>
    {
        WaitForDatabaseConnection();
        try
        {
            Interlocked.Increment(ref numOfOpenConnections);
            RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),
                failureCTS.Token);
        }
        catch (OperationCanceledException ex)
            when (ex.CancellationToken == failureCTS.Token)
        {
            // Do nothing (an exception occurred in another thread)
        }
        catch (Exception ex)
        {
            Log.Error(ex);
            failureCTS.Cancel(); // Signal failure to the other threads
            throw; // Inform the parallel loop that an error has occurred
        }
        finally
        {
            Interlocked.Decrement(ref numOfOpenConnections);
        }
    });
}

DoDatabaseCallAndInsertions方法可以在各个点检查CancellationToken参数的属性IsCancellationRequested,并在需要时执行事务回滚。

需要注意的是,RunAsCancelable 方法在使用 ThreadPool 线程方面相当浪费。必须阻塞一个额外的线程才能使每个提供的操作可取消,因此每次执行 lambda 都需要两个线程。为了防止 ThreadPool 可能出现饥饿,最好在切换到每 500 毫秒创建一个算法之前增加线程池按需创建的最小线程数,方法是使用ThreadPool.SetMinThreads 应用程序启动时的方法。

ThreadPool.SetMinThreads(100, 10);

重要提示:上述解决方案没有尝试记录可能被遗忘的操作异常。只会记录第一个失败操作的异常。