如何使用 RxScala/RxJava 从单个计算步骤构建 Observable?
How to build an Observable out of individual computation steps with RxScala/RxJava?
我目前有以下代码:
def method(): Future[State] = Future {
// some processing
State.Completed
}
但现在我注意到我实际上想要 "publish" 一组中间状态:
def method(): Observable[State] = ? {
// some processing
publish State.State1
// some processing
publish State.State2
// some processing
publish State.Completed
}
有没有简单的方法可以做到这一点?尽管我将其描述为 3 个状态转换,但实际上我可能会经历更多或更少的转换。我希望从 Future 到 Observable 的变化意味着对我当前 "imperative" 代码的最少更改。
此外,我希望这些 "events" 能够实时发布,而不仅仅是在从方法返回时发布。
使用Observable.create
并在以下任何时候推送下一个状态:
Observable<State> stateSource = Observable.create(emitter -> {
// some processing
emitter.onNext(State.State1);
// some processing
emitter.onNext(State.State2);
// some processing
emitter.onNext(State.Completed);
// no further state changes
emitter.onComplete();
});
我目前有以下代码:
def method(): Future[State] = Future {
// some processing
State.Completed
}
但现在我注意到我实际上想要 "publish" 一组中间状态:
def method(): Observable[State] = ? {
// some processing
publish State.State1
// some processing
publish State.State2
// some processing
publish State.Completed
}
有没有简单的方法可以做到这一点?尽管我将其描述为 3 个状态转换,但实际上我可能会经历更多或更少的转换。我希望从 Future 到 Observable 的变化意味着对我当前 "imperative" 代码的最少更改。
此外,我希望这些 "events" 能够实时发布,而不仅仅是在从方法返回时发布。
使用Observable.create
并在以下任何时候推送下一个状态:
Observable<State> stateSource = Observable.create(emitter -> {
// some processing
emitter.onNext(State.State1);
// some processing
emitter.onNext(State.State2);
// some processing
emitter.onNext(State.Completed);
// no further state changes
emitter.onComplete();
});