如何使用 CancellationToken 停止多个 Parallel.For 循环

How to stop multiple Parallel.For loops with CancellationToken

我正在使用 C# 和 WPF 开发应用程序。我正在使用 3 个嵌套的 Parallel.For 循环,如下所示。当我 Cancel() 令牌时,循环开始抛出 ImportAbortedException 但是,我无法捕捉到 ImportAbortedException。我咳嗽的是AggregateException

我想要的是,停止所有 Parallel.For 并抓住 ImportAbortedException 并做一些其他事情。

这是代码。

    private int _loopCount1 = 100;
    private int _loopCount2 = 200;
    private int _loopCount3 = 10000;
    private CancellationToken _cToken;
    private CancellationTokenSource _cSource;
    private void Init()
    {
        _cSource = new CancellationTokenSource();
        _cToken = new CancellationToken();
        _cToken = _cSource.Token;

        try
        {
            DoTheWork();
        }
        catch (ImportAbortedException)
        {
            /// 
        }
        catch (Exception)
        {

        }
    }

    private void StopAllLoops()
    {
        _cSource.Cancel();
    }

    private void DoTheWork()
    {
        Parallel.For(0, _loopCount1, i =>
        {
            if (CheckIfCanceled())
                throw new ImportAbortedException("process aborted!");

            // do a few calculations here.

            Parallel.For(0, _loopCount2, j =>
            {
                if (CheckIfCanceled())
                    throw new ImportAbortedException("process aborted!");

                // do a few calculations here.

                Parallel.For(0, _loopCount3, k =>
                {
                    if (CheckIfCanceled())
                        throw new ImportAbortedException("process aborted!");

                    // do some other process here.
                });
            });
        });
    }

    private bool CheckIfCanceled()
    {
        return _cToken.IsCancellationRequested;
    }

Parallel.For 的 ParallelOptions 属性有一个 CancellationToken 属性 您可以传递它,因此当取消标记被取消时,并行 for 将停止并产生 OperationCanceledException。

See MSDN ParallelOptions

我会完全避免使用 Parallel.For 并使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main" & "Rx-WPF")。您可以使用它巧妙地处理所有并行处理,并且可以将结果编组回 UI 线程。

您的代码如下所示:

private IDisposable DoTheWork()
{
    var query =
        from i in Observable.Range(0, _loopCount1)
        from x in Observable.Start(() => SomeCalculation1(i))
        from j in Observable.Range(0, _loopCount2)
        from y in Observable.Start(() => SomeCalculation2(i, j))
        from k in Observable.Range(0, _loopCount3)
        from z in Observable.Start(() => SomeCalculation3(i, j, k))
        select new { x, y, z };

    return
        query
            .ObserveOnDispatcher()
            .Subscribe(w =>
            {
                /* Do something with w.x, w.y, w.z */
            });
}

你可以这样称呼它:

var subscription = DoTheWork();

要取消,您只需这样做:

subscription.Dispose();

都是多线程的,UI安全,可以轻松取消。