RxPY - 如何使用 stop_and_wait?

RxPY - How to use stop_and_wait?

我尝试翻译 Python 中的以下 JavaScript 代码示例:

import Rx from "rx"

let source = Rx.Observable.interval(1000)
  .timestamp()
  .controlled();

source.stopAndWait().subscribe(
  (result) => console.log("onNext: ", result),
  (error) => console.log("onError: ", error),
  () => console.log("Done!")
);

该片段摘自 RxJS Release Notes。我的 Python 解释如下所示:

from __future__ import print_function
from rx import Observable


source = Observable.interval(1000).timestamp().controlled()
source.stop_and_wait().subscribe(
    on_next=lambda x: print("on_next %s" % x),
    on_error=lambda e: print("on_error %s" % e)
)

不幸的是,虽然 JavaScript 版本工作正常,但 Python 版本失败,因为 'StopAndWaitObservable' 对象没有属性 'subscription'。

针对该问题的修复已合并到 rx 库开发行中。修复后的 Python 版本就像 JavaScript 版本一样工作。