RxJava:在 HTTP 请求期间创建一个 Observable,在请求 returns 之后保持 运行

RxJava: Create an Observable during HTTP Request that keeps running after request returns

我有一个 HTTP 请求触发了一个长 运行 任务(对另一个服务的多个 HTTP 请求),该任务应该在后台完成,而原始请求完成。

所以我做的是

public void triggerWork(@RequestBody SomeObject somObject) {
    return new ResponseEntity<>(startWorkAndReturn(somObject), HttpStatus.OK);
  }

public void startWorkAndReturn(SomeObject someObject) {
    Observable.create(observableEmitter -> {
      // do the work with someObject here and at some time call
      observableEmitter.onNext("result");
    }).subscribe(new Observer<Object>() {
      @Override
      public void onSubscribe(Disposable disposable) {

      }

      @Override
      public void onNext(Object o) {
        // called at some unknown time
      }

      @Override
      public void onError(Throwable throwable) {

      }

      @Override
      public void onComplete() {
        // currently not used as all the work is done in onNext but maybe that's a mistake
      }
    });
    return;
  }

但这似乎会在所有工作完成之前阻止请求。这对我来说已经很奇怪了,因为我从不调用 onComplete,这本身可能是一个错误。但是,我仍然想知道如何创建一个在触发后台工作人员后立即 returns 的请求。

Flowables 是这里的解决方案吗?无论如何,我将重构这些以处理背压。或者我是否需要创建一个后台工作线程?这里的最佳做法是什么?

谢谢

我会使用 Observable.fromCallable{},因为您只需要发出单个事件。这将处理 onCompleate 调用。根据您分享的信息,我不知道您如何正确处理一次性用品。您应该添加 subscribeOn() 和 observeOn() 运算符,它们将定义应在哪个线程上处理 'work' 并观察结果。

文档参考:

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#fromCallable-java.util.concurrent.Callable-

http://reactivex.io/documentation/operators/subscribeon.html

http://reactivex.io/documentation/operators/observeon.html