RxJava/RxScala Observer.onNext 中的异步代码
RxJava/RxScala async code inside Observer.onNext
假设您要将来自某个流的事件存储到数据库中,并且该数据库的客户端库是异步的。
例如当发出下一个事件时,您必须在观察者的 onNext
内部调用一个方法 writeEvent(event: MyEvent): Future[Boolean]
。除了在 Future
?
上阻塞之外,还有其他好的方法吗?
我目前看到的关于如何实现这个的唯一方法是创建一些自定义 Scheduler
允许我 return 线程到池,直到异步代码里面 onNext
完成了。
你不想在你的 onNext
订阅者回调中那样阻塞,那会打败 Rx。它可以以更惯用的方式链接在一起。
我不常使用 Futures,但我想知道 Observable.from(Future)
是否可行:
someStream
.flatMap(evt => Observable.from(writeEvent(evt)))
.subscribe(
next => ...,
err => ...
)
假设您要将来自某个流的事件存储到数据库中,并且该数据库的客户端库是异步的。
例如当发出下一个事件时,您必须在观察者的 onNext
内部调用一个方法 writeEvent(event: MyEvent): Future[Boolean]
。除了在 Future
?
我目前看到的关于如何实现这个的唯一方法是创建一些自定义 Scheduler
允许我 return 线程到池,直到异步代码里面 onNext
完成了。
你不想在你的 onNext
订阅者回调中那样阻塞,那会打败 Rx。它可以以更惯用的方式链接在一起。
我不常使用 Futures,但我想知道 Observable.from(Future)
是否可行:
someStream
.flatMap(evt => Observable.from(writeEvent(evt)))
.subscribe(
next => ...,
err => ...
)