为什么 RxPY Observable 充当无限迭代器?

Why does an RxPY Observable act as an infinite iterable?

我有一个错误,它不小心将 Observable 用作可迭代对象。对于大多数对象,这通常很容易检测到:

>>> tuple(object())

Traceback (most recent call last):
  File "C:\Program Files (x86)\Python27\lib\site-packages\IPython\core\interactiveshell.py", line 3035, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-4-40e3dfc60da8>", line 1, in <module>
    tuple(object())
TypeError: 'object' object is not iterable

但是,对于 Rx observable,它会悄无声息地崩溃 Python:


MWE:

from rx import Observable

observable = Observable.from_list([1,2,3])
tuple(observable)  # Python will die silently here

没有回溯,也没有迹象表明有任何问题。这使得已经难以调试的并发响应式代码更加难以调试——我花了 2 个小时才最终找到这个代码。


经过仔细检查,迭代 Observable 似乎创建了新的可观察对象,尽管从哪里我不知道,因为可观察对象没有 __iter__ 方法。

>>> for i, x in enumerate(observable):
>>>     print x
>>>     if i > 100:  # To prevent Python from crashing
>>>         break

<rx.anonymousobservable.AnonymousObservable object at 0x03111710>
<rx.anonymousobservable.AnonymousObservable object at 0x03111850>
<rx.anonymousobservable.AnonymousObservable object at 0x03111990>
<rx.anonymousobservable.AnonymousObservable object at 0x03111AD0>
<rx.anonymousobservable.AnonymousObservable object at 0x03111C10>
<rx.anonymousobservable.AnonymousObservable object at 0x03111D50>
<rx.anonymousobservable.AnonymousObservable object at 0x03111E90>
etc...

这是错误还是功能? Observable 是可迭代的吗?

我 运行 遇到了同样的问题,实际上我不小心将一个 observable 变成了一个列表,这让我的电脑崩溃了,这有点可笑。

我不是专家,一周前我就被扔进了深渊,但这似乎是一个特点。默认情况下,Observable 是非阻塞的,所以如果那里什么都没有,它只是 return 一个空的 Observable。这就像用 O_NONBLOCK 标志打开的文件:立即用空字符串读取调用 returns。

如果您想将其用作阻塞迭代器,请使用 to_blocking()。然后您可以执行以下操作:

from rx import Observable
o = Observable.from_([1, 2, 3, 4])
i = iter(o.to_blocking())
list(i)

它会完美运行。

编辑:我刚刚发现为什么会这样。 python 的迭代器协议和 Observable 的方法之一,next() 方法之间存在冲突。文档说:

next() — returns an iterable that blocks until the Observable emits another item, then returns that item.

在python土地上,next()预计return可迭代的下一个项目。因此,您将获得无限供应的可观察对象,每个对象都承诺在未来某个日期 return 一个项目。

Observable 是可迭代的。这种行为实际上记录在案 here:

Observable sequences may be turned into an iterator so you can use generator expressions, or iterate over them (uses queueing and blocking).

xs = Observable.from_([1,2,3,4,5,6])
ys = xs.to_blocking()
zs = (x*x for x in ys if x > 3)
for x in zs:
    print(x)