这个线程代码有解释吗?

Is there explanation for this threading code?

所以遇到了一些与此非常相似的代码。我只是想知道是否有人可以向我解释一下。

查看它如何使用 RX 调度程序,然后 Parallel.For 以及其中一个新的 TaskFactory.StartNew

    IDisposable subscription = someObservable.ObserveOn(ThreadPoolScheduler.Instance)
    .Subscribe(o =>
       {
           Parallel.ForEach(xxxs,
               x =>
               {
                   var theKey = x.Key;
                   if (!theTasks.ContainsKey(theKey) ||
                       theTasks.ContainsKey(theKey) && theTasks[theKey].IsCompleted)
                   {
                       theTasks[theKey] = Task.Factory.StartNew(
                           () =>
                           {
                               .....
                               }
                               catch (CommunicationObjectAbortedException ex)
                               {
                               ....
                               }
                               catch (ObjectDisposedException ex)
                               {
                               ....
                               }
                               catch (Exception e)
                               {
                               ....
                               }
                           });
                   }
               });
       },
       ex =>
       {
          ....
       },
       () =>
       {
          ....
       });
    } 

我知道所有这些东西单独做什么,但我不太确定这里的组合线程效果是什么。谁能猜猜

啊是的,并发Turducken

ThreadPoolScheduler 线程池 上安排工作,这不同于 任务池 ThreadPoolScheduler 旨在用于任务池不可用的平台 - 尽可能使用 TaskPoolScheduler

感觉就像作者试图通过使用线程池来为手头的任务(请原谅双关语)节省任务池。

Parallel.ForEach 阻塞直到循环完成。因此,当它在线程池上 运行ning 时,当发出新项目时,在从线程池借来的线程上执行下一个 ForEach

至于内部位,作者希望每个唯一键有一个 Task 运行,如果还没有 运行ning。