了解 AsParallel 和 AsSequential:LINQ 查询的哪一部分是并发的?

Understanding AsParallel and AsSequential: which part of the LINQ query is concurrent?

我想了解是否可以使用非线程安全的 class 留给 AsParallel 查询。类似于:

src.Select(item => nonSafeClass.Process(item))
   .AsParallel()
   .Select(item => DoComputationalIntenseButThreadSafeWork(item));

我已经尝试 运行 下面的代码来查看查询链的哪一部分是并行执行的,而不是:

IEnumerable<int> array = Enumerable.Range(0, short.MaxValue).ToArray();
array.Select(i =>
    {
        Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
        return i;
    }).AsParallel().Select(i =>
        {
            Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;

        }).AsSequential().Select(i =>
            {
                Console.WriteLine("Step Three: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
            }).ToList();

但令我惊讶的是,"Step One" 和 "Step Three" 都出现在不同的线程 ID 上。我期望只看到 "Step Two" 的不同线程 ID,因为它介于 AsParallelAsSequential 之间。我的想法错了吗?

这是因为deferred execution.

how chained queries in Linq work.

如果改成最简单的情况

array.Select(i =>
        {
            Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;
        }).Select(i =>
        {
            Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;

        }).Select(i =>
        {
            Console.WriteLine("Step Three: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;
        }).ToList();

你会看到这个: 步骤1 第2步 步骤 3 步骤1 第2步 步骤 3 ... ...

现在想象一下你的假设是否正确:

第一个 Select() 运行s 在 Thread 1(主线程)上。然后你的 AsParallel 运行s 在不同的线程上,但最后,你的最终 AsSequential() 需要在同一个线程上 运行,这意味着它没有任何区别AsParallel 运行在不同的线程上运行,因为 Thread 1 被阻塞。

你想的流程是: 1 -> x -> 1

紧随其后 1 -> y -> 1

等等等等。

作为优化,当 Linq 检测到您有一个 select 后跟 AsParallel 时,它会在单独的线程上为每个迭代 运行 设置它们。同样,这是因为从 1 -> x -> 1 -> y 出发不会在 "parallel".

中产生任何东西 运行

通过运行简化版本试试看:

array.Select(i =>
        {
            Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;
        }).AsParallel().Select(i =>
        {
            Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;

        }).ToList();

您会在 "sequence" 中看到第 1 步和第 2 步,但每次迭代都在不同的线程上完成。

然而,您的 AsSequential() 将 运行 在执行它的主线程上。

因此,我希望第 1 步和第 2 步到 运行 在同一个线程上,这不同于调用线程 但第 3 步到 运行 在启动链的同一个线程上。

如果您想实现您描述的行为,只需将您的查询更改为:

array.Select(i =>
        {
            Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;
        }).ToList().AsParallel().Select(i =>
        {
            Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;

        }).AsSequential().Select(i =>
        {
            Console.WriteLine("Step Three: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;
        }).ToList();

第一个 ToList() 评估将 运行 调用线程上的所有内容,然后是 AsParallel() 运行s 在不同线程上的每次迭代(取决于 ThreadPool 可用性),最后,您的AsSequential 将确保顺序位在调用线程上是 运行。