使用 rxJava 和 retryWhen 运算符限制操作
Limit actions with rxJava and retryWhen operator
我的应用程序通常必须做两件事:
- 同时只接受一个网络请求
- 请求失败重试
我就是这样实现的:
public class RequestsLocker {
private volatile boolean isLocked;
public <T> Observable.Transformer<T, T> applyLocker() {
if(!isLocked()) {
return observable -> observable
.doOnSubscribe(() -> {
lockChannel();
})
.doOnUnsubscribe(() -> {
freeChannel();
});
} else {
return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
}
}
private void lockChannel() {
isLocked = true;
}
private void freeChannel() {
isLocked = false;
}
public boolean isLocked() {
return isLocked;
}
}
看起来不错。
现在我的 retryWhen
实现:
public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
return observable.flatMap(error -> {
// For IOExceptions, we retry
if (error instanceof IOException) {
return Observable.timer(2, TimeUnit.SECONDS);
}
// For anything else, don't retry
return Observable.error(error);
});
}
我是这样使用的:
public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
.compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}
...
public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
String notes, Subscriber<List<QueueCarItem>> subscriber) {
queueApiMediator.finishService(carItem.getId(), paymentType, notes)
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
.subscribe(subscriber);
}
主要问题是 doOnUnsubscribe()
调用任何错误,然后储物柜会为任何新请求打开,直到计时器到期并再次重新订阅。那就是问题所在。在计时器计时期间,用户可以发出另一个请求。
我该如何解决?
使用 retryWhen,为避免取消订阅 onError,您必须使用 onErrorResumeNext,它不会取消订阅。
看看这个例子
/**
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
*/
@Test
public void observableOnErrorResumeNext() {
Subscription subscription = Observable.just(null)
.map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world for extinction!");
})
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}
还有关于并发,如果你在flatMap算子中进行操作,你可以指定Max concurrent。
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func), maxConcurrent);
}
您可以在此处查看更多示例https://github.com/politrons/reactive
我目前的解决方案是不在 IoException 上解锁 RequestLocker,因为在这种情况下请求将在延迟后重复。
public <T> Observable.Transformer<T, T> applyLocker() {
if(!isLocked()) {
return observable -> observable.doOnSubscribe(() -> {
lockChannel();
}).doOnNext(obj -> {
freeChannel();
}).doOnError(throwable -> {
if(throwable instanceof IOException) {
return; // as any request will be repeated in case of IOException
}
freeChannel(channel);
});
} else {
return observable -> Observable.error(new ChannelBusyException("Channel is busy now"));
}
}
问题是您将变换器应用于可观察源,即在 retrywhen
之前。
当出现错误时,你总是要取消订阅然后重新订阅源可观察
导致你的 doOnUnsubscribe
被调用。
我建议你试试
public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));
}
public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
String notes, Subscriber<List<QueueCarItem>> subscriber) {
queueApiMediator.finishService(carItem.getId(), paymentType, notes)
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
.compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
.subscribe(subscriber);
}
PS:apply locker 转换器看起来有点不同,即它在您链接的代码中没有参数。
我的应用程序通常必须做两件事:
- 同时只接受一个网络请求
- 请求失败重试
我就是这样实现的:
public class RequestsLocker {
private volatile boolean isLocked;
public <T> Observable.Transformer<T, T> applyLocker() {
if(!isLocked()) {
return observable -> observable
.doOnSubscribe(() -> {
lockChannel();
})
.doOnUnsubscribe(() -> {
freeChannel();
});
} else {
return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
}
}
private void lockChannel() {
isLocked = true;
}
private void freeChannel() {
isLocked = false;
}
public boolean isLocked() {
return isLocked;
}
}
看起来不错。
现在我的 retryWhen
实现:
public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
return observable.flatMap(error -> {
// For IOExceptions, we retry
if (error instanceof IOException) {
return Observable.timer(2, TimeUnit.SECONDS);
}
// For anything else, don't retry
return Observable.error(error);
});
}
我是这样使用的:
public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
.compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}
...
public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
String notes, Subscriber<List<QueueCarItem>> subscriber) {
queueApiMediator.finishService(carItem.getId(), paymentType, notes)
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
.subscribe(subscriber);
}
主要问题是 doOnUnsubscribe()
调用任何错误,然后储物柜会为任何新请求打开,直到计时器到期并再次重新订阅。那就是问题所在。在计时器计时期间,用户可以发出另一个请求。
我该如何解决?
使用 retryWhen,为避免取消订阅 onError,您必须使用 onErrorResumeNext,它不会取消订阅。
看看这个例子
/**
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
*/
@Test
public void observableOnErrorResumeNext() {
Subscription subscription = Observable.just(null)
.map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world for extinction!");
})
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}
还有关于并发,如果你在flatMap算子中进行操作,你可以指定Max concurrent。
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func), maxConcurrent);
}
您可以在此处查看更多示例https://github.com/politrons/reactive
我目前的解决方案是不在 IoException 上解锁 RequestLocker,因为在这种情况下请求将在延迟后重复。
public <T> Observable.Transformer<T, T> applyLocker() {
if(!isLocked()) {
return observable -> observable.doOnSubscribe(() -> {
lockChannel();
}).doOnNext(obj -> {
freeChannel();
}).doOnError(throwable -> {
if(throwable instanceof IOException) {
return; // as any request will be repeated in case of IOException
}
freeChannel(channel);
});
} else {
return observable -> Observable.error(new ChannelBusyException("Channel is busy now"));
}
}
问题是您将变换器应用于可观察源,即在 retrywhen
之前。
当出现错误时,你总是要取消订阅然后重新订阅源可观察
导致你的 doOnUnsubscribe
被调用。
我建议你试试
public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));
}
public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
String notes, Subscriber<List<QueueCarItem>> subscriber) {
queueApiMediator.finishService(carItem.getId(), paymentType, notes)
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
.compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
.subscribe(subscriber);
}
PS:apply locker 转换器看起来有点不同,即它在您链接的代码中没有参数。