RxPy - 为什么排放与合并运算符交错?
RxPy - Why are emissions interleaved with merging operators?
所以我在学习 RxJava 和 RxKotlin 两年后开始学习 RxPy。我注意到的一件事是某些运算符会导致疯狂的交错,而这在 RxJava 中不会发生。
例如,flat_map()
将导致发射在简单的 Observable
源中乱序交错。
items = Observable.from_( ("Alpha","Beta","Gamma","Delta","Epsilon"))
items.flat_map(lambda s: Observable.from_(list(s))).subscribe(print)
输出:
A
l
B
p
e
G
h
t
a
D
a
a
m
e
E
m
l
p
a
t
s
a
i
l
o
n
但是,对于 RxJava 或 RxKotlin,一切都保持顺序和有序。
fun main(args: Array<String>) {
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
.flatMap {
Observable.from(it.toCharArray().asIterable())
}.subscribe(::println)
}
输出:
A
l
p
h
a
B
e
t
a
G
a
m
m
a
D
e
l
t
a
E
p
s
i
l
o
n
我确认 MainThread
上的一切都是 运行 并且没有奇怪的异步调度正在进行(我认为)。
为什么 RxPy 会这样?我注意到几乎任何处理多个 Observable
源合并在一起的运算符都会发生这种情况。默认调度程序到底在做什么?
还有,为什么RxPy里没有concat_map()
?我得到的印象是,这在某种程度上是不可能的,因为调度是如何工作的...
如前所述,flatMap
不保证顺序。 RxPy 没有将 concat_map
实现为不同的运算符,但您可以使用 map
和 concat_all
运算符
获得相同的效果
Observable.from_( ("Alpha","Beta","Gamma","Delta","Epsilon"))\
.map(lambda s: Observable.from_(list(s)))\
.concat_all()\
.subscribe(print)
所以我在学习 RxJava 和 RxKotlin 两年后开始学习 RxPy。我注意到的一件事是某些运算符会导致疯狂的交错,而这在 RxJava 中不会发生。
例如,flat_map()
将导致发射在简单的 Observable
源中乱序交错。
items = Observable.from_( ("Alpha","Beta","Gamma","Delta","Epsilon"))
items.flat_map(lambda s: Observable.from_(list(s))).subscribe(print)
输出:
A
l
B
p
e
G
h
t
a
D
a
a
m
e
E
m
l
p
a
t
s
a
i
l
o
n
但是,对于 RxJava 或 RxKotlin,一切都保持顺序和有序。
fun main(args: Array<String>) {
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
.flatMap {
Observable.from(it.toCharArray().asIterable())
}.subscribe(::println)
}
输出:
A
l
p
h
a
B
e
t
a
G
a
m
m
a
D
e
l
t
a
E
p
s
i
l
o
n
我确认 MainThread
上的一切都是 运行 并且没有奇怪的异步调度正在进行(我认为)。
为什么 RxPy 会这样?我注意到几乎任何处理多个 Observable
源合并在一起的运算符都会发生这种情况。默认调度程序到底在做什么?
还有,为什么RxPy里没有concat_map()
?我得到的印象是,这在某种程度上是不可能的,因为调度是如何工作的...
如前所述,flatMap
不保证顺序。 RxPy 没有将 concat_map
实现为不同的运算符,但您可以使用 map
和 concat_all
运算符
Observable.from_( ("Alpha","Beta","Gamma","Delta","Epsilon"))\
.map(lambda s: Observable.from_(list(s)))\
.concat_all()\
.subscribe(print)