调度程序:立即与当前线程

Schedulers: Immediate vs. CurrentThread

阅读 explanation 为什么

Observable.Return(5)
  .Repeat()
  .Take(1)

永远不会完成,但是

Observable.Return(5, Scheduler.CurrentThread)
  .Repeat()
  .Take(1)

按预期工作。我仍然很困惑,我不知道为什么 CurrentThread 实际上解决了问题。谁能给个明确的解释?

Ned Stoyanov 在上面的评论中提供的link Dave Sexton 有很好的解释。

我会尝试用不同的方式来说明它。以 RecursiveMethod.

中发生递归调用为例
public class RecursiveTest()
{
    private bool _isDone;

    public void RecursiveMethod()
    {
        if (!_isDone)
        {
            RecursiveMethod();

           // Never gets here...
           _isDone = true;
        }
    }  
}

您可以很容易地看出这将无限期地递归(直到出现 WhosebugException),因为 _isDone 永远不会设置为 true。这是一个过于简化的示例,但它基本上就是您的第一个示例所发生的事情。

这是 Dave Sexton 的解释,用于描述您的第一个示例中发生的情况。

By default, Return uses the ImmediateScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return. Because there's no trampoline in Return, this pattern repeats itself, blocking the current thread indefinitely. Calling Subscribe on this observable never returns.

换句话说,由于重入的无限循环,初始流程永远不会完全完成。所以我们需要一种无需重入即可完成初始流程的方法。

让我们回到上面 post 中的 RecursiveTest 示例,避免无限递归的解决方案是什么?在再次执行 RecursiveMethod 之前,我们需要 RecursiveMethod 完成其流程。一种方法是创建一个队列并将对 RecursiveMethod 的调用排队,如下所示:

public void RecursiveMethod()
{
    if (!_isDone)
    {
        Enqueue(RecursiveMethod);
        _isDone = true;
    }
}  

这样,初始流程将完成,_isDone 将设置为 true,并且在执行下一次对 RecursiveMethod 的调用时,将不再执行任何内容,从而避免无限递归。这几乎就是 Scheduler.CurrentThread 将对您的第二个示例执行的操作。

让我们看看 Dave Sexton 如何解释您的第二个示例的工作原理:

Here, Return is using the CurrentTheadScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return; however, this second subscription to Return schedules its (inner) actions on the trampoline because it's still executing on the OnCompleted callback from the first scheduled (outer) action, thus the repetition does not occur immediately. This allows Repeat to return a disposable to Take, which eventually calls OnCompleted, cancels the repetition by disposing Repeat, and ultimately the call from Subscribe returns.

再次强调,我的示例经过了真正的简化,使其易于理解,但并不是它的工作原理。 Here you can see 调度程序是如何工作的。它使用他们所谓的 Trampoline,它基本上是一个队列,可确保没有重入调用。因此,所有调用都在同一线程上一个接一个地序列化。这样就可以完成初始流程,避免无限重入循环。

希望这更清楚一点:)