使用 RxJava 进行服务器轮询 - 在获得服务器响应后重复
Server polling using RxJava - repeat after geting server response
目前我正在尝试使用 RxJava 实现服务器轮询,我已经研究了如何在收到服务器响应后重复整个链,我尝试过使用 repeat(),它可以工作但不是那么完美,原因是它使 api 调用了很多次,服务器在发送给客户端之前需要额外的时间来处理数据,但我们不知道确切的时间,所以我们无法使用 repeatWhen() 来指定具体时间。我唯一可以使用的是在 api 响应之后等待。
如有任何建议,我们将不胜感激!
以下是代码片段:
retrofitService.requestPolling()
.repeat() // do not wait to call server so many times
.takeUntil(new Func1<PollResponse, Boolean>() {
@Override
public Boolean call(PollResponse pollResponse) {
return pollResponse.mComplete;
}
})
.doOnNext(new Action1<FlightSearchPollResponse>() {
@Override
public void call(pollResponse pollResponse) {
// update UI here
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<PollResponse>() {
@Override
public void onCompleted() {
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(PollResponse pollResponse) {
} );
编辑:我是 RxJava 的新手,刚刚得到这个名为 BackPressure 的主题,并且有很多文章解释了如何处理它,因为我不想缓存这个响应,似乎 Subject 是一个不错的选择,它允许您控制何时拉动。
http://akarnokd.blogspot.com/2015/06/subjects-part-1.html
感谢@Gary LO
应该有很多方法。我想分享其中之一。
- 创建单独的信号流
PublishSubject pollingSignal
- 将信号转换为 api 呼叫
发布信号再做一次。
final PublishSubject<Boolean> pollingSignal = PublishSubject.create();
final Observable<PollResponse> apiResponse = retrofitService.requestPolling();
pollingSignal
.flatMap(x -> apiResponse)
.subscribe(new Observer<PollResponse>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable throwable) {}
@Override
public void onNext(PollResponse integer) {
// start the next polling
pollingSignal.onNext(true);
}
});
// start the first polling
pollingSignal.onNext(true);
玩得开心!
注释
使用 PublishSubject<Boolean>
而不是 PublishSubject<Void>
是因为我不习惯使用 pollingSignal.onNext(null)
.
但在 Kotlin 中,我可以将 PublishSubject<Unit>
与 pollingSignal.onNext(Unit)
一起使用
目前我正在尝试使用 RxJava 实现服务器轮询,我已经研究了如何在收到服务器响应后重复整个链,我尝试过使用 repeat(),它可以工作但不是那么完美,原因是它使 api 调用了很多次,服务器在发送给客户端之前需要额外的时间来处理数据,但我们不知道确切的时间,所以我们无法使用 repeatWhen() 来指定具体时间。我唯一可以使用的是在 api 响应之后等待。
如有任何建议,我们将不胜感激!
以下是代码片段:
retrofitService.requestPolling()
.repeat() // do not wait to call server so many times
.takeUntil(new Func1<PollResponse, Boolean>() {
@Override
public Boolean call(PollResponse pollResponse) {
return pollResponse.mComplete;
}
})
.doOnNext(new Action1<FlightSearchPollResponse>() {
@Override
public void call(pollResponse pollResponse) {
// update UI here
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<PollResponse>() {
@Override
public void onCompleted() {
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(PollResponse pollResponse) {
} );
编辑:我是 RxJava 的新手,刚刚得到这个名为 BackPressure 的主题,并且有很多文章解释了如何处理它,因为我不想缓存这个响应,似乎 Subject 是一个不错的选择,它允许您控制何时拉动。
http://akarnokd.blogspot.com/2015/06/subjects-part-1.html
感谢@Gary LO
应该有很多方法。我想分享其中之一。
- 创建单独的信号流
PublishSubject pollingSignal
- 将信号转换为 api 呼叫
发布信号再做一次。
final PublishSubject<Boolean> pollingSignal = PublishSubject.create(); final Observable<PollResponse> apiResponse = retrofitService.requestPolling(); pollingSignal .flatMap(x -> apiResponse) .subscribe(new Observer<PollResponse>() { @Override public void onCompleted() {} @Override public void onError(Throwable throwable) {} @Override public void onNext(PollResponse integer) { // start the next polling pollingSignal.onNext(true); } }); // start the first polling pollingSignal.onNext(true);
玩得开心!
注释
使用 PublishSubject<Boolean>
而不是 PublishSubject<Void>
是因为我不习惯使用 pollingSignal.onNext(null)
.
但在 Kotlin 中,我可以将 PublishSubject<Unit>
与 pollingSignal.onNext(Unit)