为什么 RX 不是新线程上的 运行 处理程序?
Why is RX not running handler on a new thread?
我正在使用 RX 2.2.5
在下面的示例中,我希望处理程序(Subscribe()
的委托)在新线程上 运行 但是当 运行 应用程序时,所有 10 个数字都是一个接一个在同一个线程上消费。
Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});
输出:
Main Thread: 1
Value: 1 on Thread: 4 IsPool: False
Value: 2 on Thread: 4 IsPool: False
Value: 3 on Thread: 4 IsPool: False
Value: 4 on Thread: 4 IsPool: False
Value: 5 on Thread: 4 IsPool: False
Value: 6 on Thread: 4 IsPool: False
Value: 7 on Thread: 4 IsPool: False
Value: 8 on Thread: 4 IsPool: False
Value: 9 on Thread: 4 IsPool: False
Value: 10 on Thread: 4 IsPool: False
它们 运行 顺序的事实也是一个谜,因为我使用 TaskPoolScheduler
来生成数字。
即使我用 TaskPoolScheduler
或 ThreadPoolScheduler
替换 NewThreadScheduler
我仍然得到一个线程,更有趣的部分是在这两种情况下 Thread.CurrentThread.IsThreadPoolThread
是False
.
我无法解释这种行为,因为当我查看 ThreadPoolScheduler
我看到:
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
ThreadPool.QueueUserWorkItem((WaitCallback) (_ =>
{
if (d.IsDisposed)
return;
d.Disposable = action((IScheduler) this, state);
}), (object) null);
return (IDisposable) d;
}
我可以清楚地看到ThreadPool.QueueUserWorkItem...
所以为什么IsPool == False
?
我在这里错过了什么?
首先,Rx 有一个行为契约。它一次只会通过一个观察者处理一个值。如果多个观察者附加到一个可观察者(通常只有热可观察者),那么在产生下一个值之前,每个观察者都会运行一个单一的值,一次一个。
那就解释了为什么你的价值观是一个接一个产生的。
其次,关于为什么他们 运行 在同一个线程上。这基本上是基于上述行为契约优化性能的结果。启动新线程需要时间和资源,因此 Rx 团队尽可能重用线程以避免开销。
应该清楚的是,一个线程上的可观察对象 运行ning 产生值的速度可能比其观察者处理它们的速度更快。如果是这样,他们会排队。因此,对于启动新线程以进行观察的任何调度程序,逻辑是这样的 - 如果观察者完成处理一个值,则当前 运行ning 线程检查队列以查看是否还有另一个值要处理如果有它处理它 - 不需要新线程。如果没有,则线程结束或返回到 TaskPool
等,因此当新值可用时,该线程消失并且需要替代方案。 NewThreadScheduler
向上旋转一个。 TaskPoolScheduler
从 TaskPool
等中得到一个
因此,这归结为一个简单的优化,可以在值排队等待顺序处理时加快处理速度。
在我的测试中,当使用 NewThreadScheduler
时,我无法创建一个为每个新值使用新线程的单个可观察样本,但后来我在 NewThreadScheduler
来源中找到了这个:
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
EventLoopScheduler eventLoopScheduler = new EventLoopScheduler(this._threadFactory);
eventLoopScheduler.ExitIfEmpty = true;
return eventLoopScheduler.Schedule<TState>(state, dueTime, action);
}
因此,在幕后它正在创建一个新的 EventLoopScheduler
(单一不变线程调度程序)并将调度移交给它。难怪跟帖不变。
显然这是设计使然!正如 Microsoft 解释的那样:
...Both the TaskPool and CLR ThreadPool schedulers support
long-running operations. The former does so through
TaskCreationOptions.LongRunning (which really - in today's
implementation of the TPL - amounts to creating a new thread); the
latter calls into NewThread to achieve the same effect. What matters
more than the specific type of scheduler used is the achieved effect.
In this particular case, one expects asynchrony caused by introduction
of additional concurrency.
还接着说:
In case you really want to have a thread pool thread under any
circumstance (e.g. to enforce a global maximum degree of parallelism
for your app - though keep in mind things like
TaskCreationOptions.LongRunning are wildcards to bypass this
mechanism), you'll have to apply the DisableOptimizations extension
method to the ThreadPoolScheduler instance to make it fall back to
recursive behavior. Notice you can pass in the interfaces you want to
disable (specifying none means disabling all optimizations), in this
case typeof(ISchedulerLongRunning) suffices.
根据我上面的评论,您似乎正在尝试使用 Rx(一个用于查询和组合可观察数据序列的库)作为并行计算库?
我认为要看到您期望的结果,您可以将查询改成这样
Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.SelectMany(i=>Observable.Start(()=>i, NewThreadScheduler.Default))
//.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});
输出:
Main Thread: 11
Value: 1 Thread: 15 IsPool: False
Value: 4 Thread: 21 IsPool: False
Value: 2 Thread: 14 IsPool: False
Value: 3 Thread: 13 IsPool: False
Value: 5 Thread: 21 IsPool: False
Value: 6 Thread: 21 IsPool: False
Value: 7 Thread: 21 IsPool: False
Value: 8 Thread: 21 IsPool: False
Value: 9 Thread: 21 IsPool: False
Value: 10 Thread: 21 IsPool: False
请注意,我们在这里看到多个线程在起作用,但也请注意,我们现在得到乱序值(很明显,因为我们引入了我们不再控制的并发)。所以这是选择你的毒药(或你合适的库)的情况
我正在使用 RX 2.2.5
在下面的示例中,我希望处理程序(Subscribe()
的委托)在新线程上 运行 但是当 运行 应用程序时,所有 10 个数字都是一个接一个在同一个线程上消费。
Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});
输出:
Main Thread: 1
Value: 1 on Thread: 4 IsPool: False
Value: 2 on Thread: 4 IsPool: False
Value: 3 on Thread: 4 IsPool: False
Value: 4 on Thread: 4 IsPool: False
Value: 5 on Thread: 4 IsPool: False
Value: 6 on Thread: 4 IsPool: False
Value: 7 on Thread: 4 IsPool: False
Value: 8 on Thread: 4 IsPool: False
Value: 9 on Thread: 4 IsPool: False
Value: 10 on Thread: 4 IsPool: False
它们 运行 顺序的事实也是一个谜,因为我使用 TaskPoolScheduler
来生成数字。
即使我用 TaskPoolScheduler
或 ThreadPoolScheduler
替换 NewThreadScheduler
我仍然得到一个线程,更有趣的部分是在这两种情况下 Thread.CurrentThread.IsThreadPoolThread
是False
.
我无法解释这种行为,因为当我查看 ThreadPoolScheduler
我看到:
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
ThreadPool.QueueUserWorkItem((WaitCallback) (_ =>
{
if (d.IsDisposed)
return;
d.Disposable = action((IScheduler) this, state);
}), (object) null);
return (IDisposable) d;
}
我可以清楚地看到ThreadPool.QueueUserWorkItem...
所以为什么IsPool == False
?
我在这里错过了什么?
首先,Rx 有一个行为契约。它一次只会通过一个观察者处理一个值。如果多个观察者附加到一个可观察者(通常只有热可观察者),那么在产生下一个值之前,每个观察者都会运行一个单一的值,一次一个。
那就解释了为什么你的价值观是一个接一个产生的。
其次,关于为什么他们 运行 在同一个线程上。这基本上是基于上述行为契约优化性能的结果。启动新线程需要时间和资源,因此 Rx 团队尽可能重用线程以避免开销。
应该清楚的是,一个线程上的可观察对象 运行ning 产生值的速度可能比其观察者处理它们的速度更快。如果是这样,他们会排队。因此,对于启动新线程以进行观察的任何调度程序,逻辑是这样的 - 如果观察者完成处理一个值,则当前 运行ning 线程检查队列以查看是否还有另一个值要处理如果有它处理它 - 不需要新线程。如果没有,则线程结束或返回到 TaskPool
等,因此当新值可用时,该线程消失并且需要替代方案。 NewThreadScheduler
向上旋转一个。 TaskPoolScheduler
从 TaskPool
等中得到一个
因此,这归结为一个简单的优化,可以在值排队等待顺序处理时加快处理速度。
在我的测试中,当使用 NewThreadScheduler
时,我无法创建一个为每个新值使用新线程的单个可观察样本,但后来我在 NewThreadScheduler
来源中找到了这个:
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
EventLoopScheduler eventLoopScheduler = new EventLoopScheduler(this._threadFactory);
eventLoopScheduler.ExitIfEmpty = true;
return eventLoopScheduler.Schedule<TState>(state, dueTime, action);
}
因此,在幕后它正在创建一个新的 EventLoopScheduler
(单一不变线程调度程序)并将调度移交给它。难怪跟帖不变。
显然这是设计使然!正如 Microsoft 解释的那样:
...Both the TaskPool and CLR ThreadPool schedulers support long-running operations. The former does so through TaskCreationOptions.LongRunning (which really - in today's implementation of the TPL - amounts to creating a new thread); the latter calls into NewThread to achieve the same effect. What matters more than the specific type of scheduler used is the achieved effect. In this particular case, one expects asynchrony caused by introduction of additional concurrency.
还接着说:
In case you really want to have a thread pool thread under any circumstance (e.g. to enforce a global maximum degree of parallelism for your app - though keep in mind things like TaskCreationOptions.LongRunning are wildcards to bypass this mechanism), you'll have to apply the DisableOptimizations extension method to the ThreadPoolScheduler instance to make it fall back to recursive behavior. Notice you can pass in the interfaces you want to disable (specifying none means disabling all optimizations), in this case typeof(ISchedulerLongRunning) suffices.
根据我上面的评论,您似乎正在尝试使用 Rx(一个用于查询和组合可观察数据序列的库)作为并行计算库?
我认为要看到您期望的结果,您可以将查询改成这样
Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);
var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.SelectMany(i=>Observable.Start(()=>i, NewThreadScheduler.Default))
//.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});
输出:
Main Thread: 11
Value: 1 Thread: 15 IsPool: False
Value: 4 Thread: 21 IsPool: False
Value: 2 Thread: 14 IsPool: False
Value: 3 Thread: 13 IsPool: False
Value: 5 Thread: 21 IsPool: False
Value: 6 Thread: 21 IsPool: False
Value: 7 Thread: 21 IsPool: False
Value: 8 Thread: 21 IsPool: False
Value: 9 Thread: 21 IsPool: False
Value: 10 Thread: 21 IsPool: False
请注意,我们在这里看到多个线程在起作用,但也请注意,我们现在得到乱序值(很明显,因为我们引入了我们不再控制的并发)。所以这是选择你的毒药(或你合适的库)的情况