停止 Parallel.ForEachAsync
Stop Parallel.ForEachAsync
在 C# 中,我对 停止 一个 Parallel.ForEachAsync
循环感兴趣(考虑 differences between Stop
and Break
);对于 Parallel.ForEach
我可以执行以下操作:
Parallel.ForEach(items, (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
state.Stop();
return;
}
// some process on the item
Process(item);
});
但是,由于我有一个进程需要异步执行,所以我转而使用Parallel.ForEachAsync
。 ForEachAsync
没有 Stop()
方法,我可以 break
循环如下,但我想知道这是否是破坏外观的最有效方法(在换句话说,循环需要在收到取消请求时尽快停止。
await Parallel.ForEachAsync(items, async (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// some async process on the item
await ProcessAsync(item);
});
您将需要这样的东西:
await Parallel.ForEachAsync(items, async (item, state) =>
{
await ProcessAsync(item, cancellationToken);
});
async Task ProcessAsync(string item, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
//Process
}
}
Parallel.ForEachAsync
lambda has a CancellationToken
as its second parameter. This token is supplied by the API, it's not the same token that you have passed in the ParallelOptions
. You can forward this token to any asynchronous method that you invoke inside the lambda. If you invoke non-cancelable methods, then the best you can do is to call the ThrowIfCancellationRequested
在 lambda 内部的关键位置:
var cts = new CancellationTokenSource();
var options = new ParallelOptions() { CancellationToken = cts.Token };
try
{
await Parallel.ForEachAsync(items, options, async (item, ct) =>
{
//...
ct.ThrowIfCancellationRequested();
//...
await ProcessAsync(item, ct);
//...
ct.ThrowIfCancellationRequested();
//...
});
}
catch (OperationCanceledException ex)
{
// ...
}
在 lambda 中作为参数提供的令牌,即上例中的 ct
,不仅在 ParallelOptions.CancellationToken
被取消时被取消,而且在 ProcessAsync
操作的情况下也会被取消失败了。这种机制允许更快地传播异常。并行循环不会在错误发生时立即完成,因为它遵循不允许 fire-and-forget 操作的原则。循环内部启动的所有操作都必须在整个循环成功或失败之前完成。 lambda 中的令牌可以将这种延迟降至最低。
在 C# 中,我对 停止 一个 Parallel.ForEachAsync
循环感兴趣(考虑 differences between Stop
and Break
);对于 Parallel.ForEach
我可以执行以下操作:
Parallel.ForEach(items, (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
state.Stop();
return;
}
// some process on the item
Process(item);
});
但是,由于我有一个进程需要异步执行,所以我转而使用Parallel.ForEachAsync
。 ForEachAsync
没有 Stop()
方法,我可以 break
循环如下,但我想知道这是否是破坏外观的最有效方法(在换句话说,循环需要在收到取消请求时尽快停止。
await Parallel.ForEachAsync(items, async (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// some async process on the item
await ProcessAsync(item);
});
您将需要这样的东西:
await Parallel.ForEachAsync(items, async (item, state) =>
{
await ProcessAsync(item, cancellationToken);
});
async Task ProcessAsync(string item, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
//Process
}
}
Parallel.ForEachAsync
lambda has a CancellationToken
as its second parameter. This token is supplied by the API, it's not the same token that you have passed in the ParallelOptions
. You can forward this token to any asynchronous method that you invoke inside the lambda. If you invoke non-cancelable methods, then the best you can do is to call the ThrowIfCancellationRequested
在 lambda 内部的关键位置:
var cts = new CancellationTokenSource();
var options = new ParallelOptions() { CancellationToken = cts.Token };
try
{
await Parallel.ForEachAsync(items, options, async (item, ct) =>
{
//...
ct.ThrowIfCancellationRequested();
//...
await ProcessAsync(item, ct);
//...
ct.ThrowIfCancellationRequested();
//...
});
}
catch (OperationCanceledException ex)
{
// ...
}
在 lambda 中作为参数提供的令牌,即上例中的 ct
,不仅在 ParallelOptions.CancellationToken
被取消时被取消,而且在 ProcessAsync
操作的情况下也会被取消失败了。这种机制允许更快地传播异常。并行循环不会在错误发生时立即完成,因为它遵循不允许 fire-and-forget 操作的原则。循环内部启动的所有操作都必须在整个循环成功或失败之前完成。 lambda 中的令牌可以将这种延迟降至最低。