如何使用 RxScala/RxJava 从单个计算步骤构建 Observable?

How to build an Observable out of individual computation steps with RxScala/RxJava?

我目前有以下代码:

def method(): Future[State] = Future {
  // some processing
  State.Completed
}

但现在我注意到我实际上想要 "publish" 一组中间状态:

def method(): Observable[State] = ? {
  // some processing
  publish State.State1
  // some processing
  publish State.State2
  // some processing
  publish State.Completed
}

有没有简单的方法可以做到这一点?尽管我将其描述为 3 个状态转换,但实际上我可能会经历更多或更少的转换。我希望从 Future 到 Observable 的变化意味着对我当前 "imperative" 代码的最少更改。

此外,我希望这些 "events" 能够实时发布,而不仅仅是在从方法返回时发布。

使用Observable.create并在以下任何时候推送下一个状态:

Observable<State> stateSource = Observable.create(emitter -> {
     // some processing
     emitter.onNext(State.State1);

     // some processing
     emitter.onNext(State.State2);

     // some processing
     emitter.onNext(State.Completed);

     // no further state changes
     emitter.onComplete();
});