定期轮询后端API一定次数 - Retrofit & RxJava
Polling to Backend API in regular interval for certain number of times in a regular interval - Retrofit & RxJava
我希望以预定义的固定时间间隔轮询后端调用一定次数。如果我在循环之间收到预期的有效载荷,我想退出循环并更新 UI 否则终止轮询。
下面是我在进行标准 http 调用时通常执行的代码。
//Response Model from backend API
public class ApplicationStatusResponse
{
public boolean isActive;
}
//Retrofit facade
@POST(v1/api/applicationStatus)
Single<ApplicationStatusResponse> checkApplicationStatus(@Body ApplicationStatusRequest applicationRequest);
-----
DisposableSingleObserver<ApplicationStatusResponse> disposableSingleObserver = new DisposableSingleObserver<ApplicationStatusResponse>() {
@Override
public void onSuccess(ApplicationStatusResponse response) {
// Update UI Here
}
@Override
public void onError(Throwable e) {
}
};
CompositeDisposable compositeDisposable = new CompositeDisposable();
// Following call works alaways works
DisposableSingleObserver<ApplicationStatusResponse> disposable = originationRepo.checkApplicationStatus(applicationStatusRequest)
.observeOn(schedulerProvider.mainThread())
.subscribeWith(disposableSingleObserver);
compositeDisposable.add(disposable);
但是我在下面的代码中有点迷失了语法错误,当从 Flowable.interval 调用时我无法使用相同的 disposableSingleObserver 并且在我需要的用例中需要帮助定期更新 UI 状态,直到时间过去或状态处于活动状态,无论哪种情况先发生,而且如果我收到 500 的 HTTP 状态代码,我也不会在终止轮询后重复,直到满足上述条件。
//Help Needed here when I need polling in regular interval - I am kind of the syntax error complain from Android Studio
int INITIAL_DELAY = 0;
int POLLING_INTERVAL = 1000;
int POLL_COUNT = 8;
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest))
.take(POLL_COUNT) ??
// How can I receive the response payload and update the UI
compositeDisposable.add(disposable);
提前感谢您的帮助。
选项 #1 使用 filter
+ take(1)
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest))
.take(POLL_COUNT) //YES
.doOnNext() // update UI here
.map(response -> ) // should stop condition. true - stop, false - continue
.filter(!shouldContinue)
.take(1)
选项 #2 使用 Subject
+ takeUntil
Subject<Boolean> stopSubject = PublishSubject.create();
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.takeUntil(stopSubject.asObservable())
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest))
.take(POLL_COUNT) //YES
.subscribe(
response -> {
//update UI
boolean shouldStop = ... // calculate
if (shouldStop) {
stopSubject.onNext(true);
}
}
...
)
PS。这是伪代码。我希望你明白。
(继续 ,您还可以 "short-circuit" 通过抛出自定义 Error/Exception 的可观察对象)
选项 3:
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest)) // .flatMap (?)
.take(POLL_COUNT) //YES
.doOnNext() // update UI here
.map(response -> {
if(!response.checkCondition()) {
throw new ShortCircuitException();
}
return response.data();
})
.onErrorResumeNext(throwable -> (throwable instanceof ShortCircuitException)
? Observable.empty()
: Observable.error(throwable))
我希望以预定义的固定时间间隔轮询后端调用一定次数。如果我在循环之间收到预期的有效载荷,我想退出循环并更新 UI 否则终止轮询。
下面是我在进行标准 http 调用时通常执行的代码。
//Response Model from backend API
public class ApplicationStatusResponse
{
public boolean isActive;
}
//Retrofit facade
@POST(v1/api/applicationStatus)
Single<ApplicationStatusResponse> checkApplicationStatus(@Body ApplicationStatusRequest applicationRequest);
-----
DisposableSingleObserver<ApplicationStatusResponse> disposableSingleObserver = new DisposableSingleObserver<ApplicationStatusResponse>() {
@Override
public void onSuccess(ApplicationStatusResponse response) {
// Update UI Here
}
@Override
public void onError(Throwable e) {
}
};
CompositeDisposable compositeDisposable = new CompositeDisposable();
// Following call works alaways works
DisposableSingleObserver<ApplicationStatusResponse> disposable = originationRepo.checkApplicationStatus(applicationStatusRequest)
.observeOn(schedulerProvider.mainThread())
.subscribeWith(disposableSingleObserver);
compositeDisposable.add(disposable);
但是我在下面的代码中有点迷失了语法错误,当从 Flowable.interval 调用时我无法使用相同的 disposableSingleObserver 并且在我需要的用例中需要帮助定期更新 UI 状态,直到时间过去或状态处于活动状态,无论哪种情况先发生,而且如果我收到 500 的 HTTP 状态代码,我也不会在终止轮询后重复,直到满足上述条件。
//Help Needed here when I need polling in regular interval - I am kind of the syntax error complain from Android Studio
int INITIAL_DELAY = 0;
int POLLING_INTERVAL = 1000;
int POLL_COUNT = 8;
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest))
.take(POLL_COUNT) ??
// How can I receive the response payload and update the UI
compositeDisposable.add(disposable);
提前感谢您的帮助。
选项 #1 使用 filter
+ take(1)
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest))
.take(POLL_COUNT) //YES
.doOnNext() // update UI here
.map(response -> ) // should stop condition. true - stop, false - continue
.filter(!shouldContinue)
.take(1)
选项 #2 使用 Subject
+ takeUntil
Subject<Boolean> stopSubject = PublishSubject.create();
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.takeUntil(stopSubject.asObservable())
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest))
.take(POLL_COUNT) //YES
.subscribe(
response -> {
//update UI
boolean shouldStop = ... // calculate
if (shouldStop) {
stopSubject.onNext(true);
}
}
...
)
PS。这是伪代码。我希望你明白。
(继续
选项 3:
disposable = Flowable
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
.map(x -> originationRepo.checkApplicationStatus(applicationStatusRequest)) // .flatMap (?)
.take(POLL_COUNT) //YES
.doOnNext() // update UI here
.map(response -> {
if(!response.checkCondition()) {
throw new ShortCircuitException();
}
return response.data();
})
.onErrorResumeNext(throwable -> (throwable instanceof ShortCircuitException)
? Observable.empty()
: Observable.error(throwable))