RxJava:在 HTTP 请求期间创建一个 Observable,在请求 returns 之后保持 运行
RxJava: Create an Observable during HTTP Request that keeps running after request returns
我有一个 HTTP 请求触发了一个长 运行 任务(对另一个服务的多个 HTTP 请求),该任务应该在后台完成,而原始请求完成。
所以我做的是
public void triggerWork(@RequestBody SomeObject somObject) {
return new ResponseEntity<>(startWorkAndReturn(somObject), HttpStatus.OK);
}
public void startWorkAndReturn(SomeObject someObject) {
Observable.create(observableEmitter -> {
// do the work with someObject here and at some time call
observableEmitter.onNext("result");
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Object o) {
// called at some unknown time
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
// currently not used as all the work is done in onNext but maybe that's a mistake
}
});
return;
}
但这似乎会在所有工作完成之前阻止请求。这对我来说已经很奇怪了,因为我从不调用 onComplete,这本身可能是一个错误。但是,我仍然想知道如何创建一个在触发后台工作人员后立即 returns 的请求。
Flowables 是这里的解决方案吗?无论如何,我将重构这些以处理背压。或者我是否需要创建一个后台工作线程?这里的最佳做法是什么?
谢谢
我会使用 Observable.fromCallable{},因为您只需要发出单个事件。这将处理 onCompleate 调用。根据您分享的信息,我不知道您如何正确处理一次性用品。您应该添加 subscribeOn() 和 observeOn() 运算符,它们将定义应在哪个线程上处理 'work' 并观察结果。
文档参考:
http://reactivex.io/documentation/operators/subscribeon.html
我有一个 HTTP 请求触发了一个长 运行 任务(对另一个服务的多个 HTTP 请求),该任务应该在后台完成,而原始请求完成。
所以我做的是
public void triggerWork(@RequestBody SomeObject somObject) {
return new ResponseEntity<>(startWorkAndReturn(somObject), HttpStatus.OK);
}
public void startWorkAndReturn(SomeObject someObject) {
Observable.create(observableEmitter -> {
// do the work with someObject here and at some time call
observableEmitter.onNext("result");
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Object o) {
// called at some unknown time
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
// currently not used as all the work is done in onNext but maybe that's a mistake
}
});
return;
}
但这似乎会在所有工作完成之前阻止请求。这对我来说已经很奇怪了,因为我从不调用 onComplete,这本身可能是一个错误。但是,我仍然想知道如何创建一个在触发后台工作人员后立即 returns 的请求。
Flowables 是这里的解决方案吗?无论如何,我将重构这些以处理背压。或者我是否需要创建一个后台工作线程?这里的最佳做法是什么?
谢谢
我会使用 Observable.fromCallable{},因为您只需要发出单个事件。这将处理 onCompleate 调用。根据您分享的信息,我不知道您如何正确处理一次性用品。您应该添加 subscribeOn() 和 observeOn() 运算符,它们将定义应在哪个线程上处理 'work' 并观察结果。
文档参考:
http://reactivex.io/documentation/operators/subscribeon.html