RxJava/RxScala Observer.onNext 中的异步代码

RxJava/RxScala async code inside Observer.onNext

假设您要将来自某个流的事件存储到数据库中,并且该数据库的客户端库是异步的。

例如当发出下一个事件时,您必须在观察者的 onNext 内部调用一个方法 writeEvent(event: MyEvent): Future[Boolean]。除了在 Future?

上阻塞之外,还有其他好的方法吗?

我目前看到的关于如何实现这个的唯一方法是创建一些自定义 Scheduler 允许我 return 线程到池,直到异步代码里面 onNext 完成了。

你不想在你的 onNext 订阅者回调中那样阻塞,那会打败 Rx。它可以以更惯用的方式链接在一起。

我不常使用 Futures,但我想知道 Observable.from(Future) 是否可行:

someStream
    .flatMap(evt => Observable.from(writeEvent(evt)))
    .subscribe(
        next => ...,
        err => ...
)