Observable#repeat 在响应式扩展中的奇怪行为

Strange behavior of Observable#repeat in reactive extensions

我正在研究 rx 运算符,很好奇为什么 just(null).repeat() 不能作为任何内置运算符的参数:

Observable.interval(1, TimeUnit.SECONDS)
    .sample(Observable.just(null).repeat())
    .subscribe(System.out::println);

我原以为它会打印 0 1 2 3 ... 但它只是挂起。我想这是因为 repeat 占用了默认值 Scheduler,但是,如果您交换 intervaljust-repeat 的角色,那么它会按预期工作,打印 null 每秒一次:

Observable.just(null).repeat()
    .sample(Observable.interval(1, TimeUnit.SECONDS))
    .subscribe(System.out::println);

这是怎么回事?

如果您不指定调度程序(并且没有操作员设置调度程序),则所有处理都发生在同一个线程上。 just(null).repeat() 将占用 100% 的 CPU 核心,因此其他任何东西都没有机会继续。

在您的情况下,interval 是在 Scedulers.computation() 调度程序上生成的,并且因为它处于开始阶段并且之后没有发生调度程序更改,所以您的 repeat 也在处理同一个线程。

在第二种情况下,所有内容都在同一个线程上订阅,除了间隔,它在自己的调度程序上;其余取决于 sample.

的内部实现

如果您使用特定的调度程序,它应该可以工作:

.sample(Observable.just(null).repeat().subscribeOn(Schedulers.computation()))

请注意,如果您只想使用空值而不是 interval 生成的数字,则更有效的方法是使用 map 而不是 sample:

.map(any -> (Object) null)