Observable#repeat 在响应式扩展中的奇怪行为
Strange behavior of Observable#repeat in reactive extensions
我正在研究 rx 运算符,很好奇为什么 just(null).repeat()
不能作为任何内置运算符的参数:
Observable.interval(1, TimeUnit.SECONDS)
.sample(Observable.just(null).repeat())
.subscribe(System.out::println);
我原以为它会打印 0 1 2 3 ...
但它只是挂起。我想这是因为 repeat
占用了默认值 Scheduler
,但是,如果您交换 interval
和 just-repeat
的角色,那么它会按预期工作,打印 null
每秒一次:
Observable.just(null).repeat()
.sample(Observable.interval(1, TimeUnit.SECONDS))
.subscribe(System.out::println);
这是怎么回事?
如果您不指定调度程序(并且没有操作员设置调度程序),则所有处理都发生在同一个线程上。 just(null).repeat()
将占用 100% 的 CPU 核心,因此其他任何东西都没有机会继续。
在您的情况下,interval
是在 Scedulers.computation()
调度程序上生成的,并且因为它处于开始阶段并且之后没有发生调度程序更改,所以您的 repeat
也在处理同一个线程。
在第二种情况下,所有内容都在同一个线程上订阅,除了间隔,它在自己的调度程序上;其余取决于 sample
.
的内部实现
如果您使用特定的调度程序,它应该可以工作:
.sample(Observable.just(null).repeat().subscribeOn(Schedulers.computation()))
请注意,如果您只想使用空值而不是 interval
生成的数字,则更有效的方法是使用 map
而不是 sample
:
.map(any -> (Object) null)
我正在研究 rx 运算符,很好奇为什么 just(null).repeat()
不能作为任何内置运算符的参数:
Observable.interval(1, TimeUnit.SECONDS)
.sample(Observable.just(null).repeat())
.subscribe(System.out::println);
我原以为它会打印 0 1 2 3 ...
但它只是挂起。我想这是因为 repeat
占用了默认值 Scheduler
,但是,如果您交换 interval
和 just-repeat
的角色,那么它会按预期工作,打印 null
每秒一次:
Observable.just(null).repeat()
.sample(Observable.interval(1, TimeUnit.SECONDS))
.subscribe(System.out::println);
这是怎么回事?
如果您不指定调度程序(并且没有操作员设置调度程序),则所有处理都发生在同一个线程上。 just(null).repeat()
将占用 100% 的 CPU 核心,因此其他任何东西都没有机会继续。
在您的情况下,interval
是在 Scedulers.computation()
调度程序上生成的,并且因为它处于开始阶段并且之后没有发生调度程序更改,所以您的 repeat
也在处理同一个线程。
在第二种情况下,所有内容都在同一个线程上订阅,除了间隔,它在自己的调度程序上;其余取决于 sample
.
如果您使用特定的调度程序,它应该可以工作:
.sample(Observable.just(null).repeat().subscribeOn(Schedulers.computation()))
请注意,如果您只想使用空值而不是 interval
生成的数字,则更有效的方法是使用 map
而不是 sample
:
.map(any -> (Object) null)