`observeOn` 和参数化调度器之间的区别
Difference between `observeOn` and parametrized Scheduler
我希望以下两段代码是等价的。 Repeat 默认使用 currentThread
调度器。如果我们将它更改为 immediate
调度程序:
Rx.Observable.fromArray([1,2,3,4,5]).flatMap(a => {
return Rx.Observable.repeat(a, 3, Rx.Scheduler.immediate)
})
.subscribe(r => console.log(r));
上面的代码产生了预期的结果:1, 1, 1, 2, 2, 2, ...
。但是下面的代码没有,并产生了一系列混合值:
Rx.Observable.fromArray([1,2,3,4,5]).flatMap(a => {
return Rx.Observable.repeat(a, 3).observeOn(Rx.Scheduler.immediate)
})
.subscribe(r => console.log(r));
我不明白这种行为,但我想我漏掉了什么。 repeat
可以传递一个 Scheduler 参数,但我想我也可以通过使用 observeOn
在特定的 Scheduler 上强制 Observable。我错过了什么?
不同之处在于,一种使用调度程序进行生成,第二种仅使用它进行传播。
在第二个版本中,您仍然使用 currentThread
来创建值。 observeOn
只会在值从前一个运算符发出后 将值强制到不同的调度程序 但对于生成事件的运算符,这不会影响这些事件的生成。
如果你查看一些创建运算符(如 fromArray
)的内部,你会看到如下内容:
//Changing the scheduler will change how recursive scheduling works
scheduler.schedulerRecursiveWithState(0, function(self, state) {
if (i < len) {
observer.onNext(array[i]);
//Schedule the next event
self(i + 1);
} else {
observer.onCompleted();
}
});
而 observeOn
类似于做这样的事情:
//Doesn't change when events get generated, simply reschedules them for down stream
source.subscribe(function(x) {
scheduler.scheduleWithState(x, function(self, state) {
observer.onNext(x);
});
});
我希望以下两段代码是等价的。 Repeat 默认使用 currentThread
调度器。如果我们将它更改为 immediate
调度程序:
Rx.Observable.fromArray([1,2,3,4,5]).flatMap(a => {
return Rx.Observable.repeat(a, 3, Rx.Scheduler.immediate)
})
.subscribe(r => console.log(r));
上面的代码产生了预期的结果:1, 1, 1, 2, 2, 2, ...
。但是下面的代码没有,并产生了一系列混合值:
Rx.Observable.fromArray([1,2,3,4,5]).flatMap(a => {
return Rx.Observable.repeat(a, 3).observeOn(Rx.Scheduler.immediate)
})
.subscribe(r => console.log(r));
我不明白这种行为,但我想我漏掉了什么。 repeat
可以传递一个 Scheduler 参数,但我想我也可以通过使用 observeOn
在特定的 Scheduler 上强制 Observable。我错过了什么?
不同之处在于,一种使用调度程序进行生成,第二种仅使用它进行传播。
在第二个版本中,您仍然使用 currentThread
来创建值。 observeOn
只会在值从前一个运算符发出后 将值强制到不同的调度程序 但对于生成事件的运算符,这不会影响这些事件的生成。
如果你查看一些创建运算符(如 fromArray
)的内部,你会看到如下内容:
//Changing the scheduler will change how recursive scheduling works
scheduler.schedulerRecursiveWithState(0, function(self, state) {
if (i < len) {
observer.onNext(array[i]);
//Schedule the next event
self(i + 1);
} else {
observer.onCompleted();
}
});
而 observeOn
类似于做这样的事情:
//Doesn't change when events get generated, simply reschedules them for down stream
source.subscribe(function(x) {
scheduler.scheduleWithState(x, function(self, state) {
observer.onNext(x);
});
});