使用 rxJava 和 retryWhen 运算符限制操作

Limit actions with rxJava and retryWhen operator

我的应用程序通常必须做两件事:

我就是这样实现的:

public class RequestsLocker {

    private volatile boolean isLocked;

    public <T> Observable.Transformer<T, T> applyLocker() {
        if(!isLocked()) {
            return observable -> observable
                    .doOnSubscribe(() -> {
                        lockChannel();
                    })
                    .doOnUnsubscribe(() -> {
                        freeChannel();
                    });
        } else {
            return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
        }
    }

    private void lockChannel() {
        isLocked = true;
    }

    private void freeChannel() {
        isLocked = false;
    }

    public boolean isLocked() {
        return isLocked;
    }

}

看起来不错。

现在我的 retryWhen 实现:

public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
    return observable.flatMap(error -> {
        // For IOExceptions, we  retry
        if (error instanceof IOException) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }

        // For anything else, don't retry
        return Observable.error(error);
    });
}

我是这样使用的:

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}

...

public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .subscribe(subscriber);
}

主要问题是 doOnUnsubscribe() 调用任何错误,然后储物柜会为任何新请求打开,直到计时器到期并再次重新订阅。那就是问题所在。在计时器计时期间,用户可以发出另一个请求。

我该如何解决?

使用 retryWhen,为避免取消订阅 onError,您必须使用 onErrorResumeNext,它不会取消订阅。

看看这个例子

/**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

还有关于并发,如果你在flatMap算子中进行操作,你可以指定Max concurrent。

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
    if (getClass() == ScalarSynchronousObservable.class) {
        return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
    }
    return merge(map(func), maxConcurrent);
}

您可以在此处查看更多示例https://github.com/politrons/reactive

我目前的解决方案是不在 IoException 上解锁 RequestLocker,因为在这种情况下请求将在延迟后重复。

public <T> Observable.Transformer<T, T> applyLocker() {
    if(!isLocked()) {
        return observable -> observable.doOnSubscribe(() -> {
            lockChannel();
        }).doOnNext(obj -> {
            freeChannel();
        }).doOnError(throwable -> {
            if(throwable instanceof IOException) {
                return; // as any request will be repeated in case of IOException
            }
            freeChannel(channel);
        });
    } else {
        return observable -> Observable.error(new ChannelBusyException("Channel is busy now"));
    }
}

问题是您将变换器应用于可观察源,即在 retrywhen 之前。 当出现错误时,你总是要取消订阅然后重新订阅源可观察 导致你的 doOnUnsubscribe 被调用。

我建议你试试

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));            
}


public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
            .subscribe(subscriber);
}

PS:apply locker 转换器看起来有点不同,即它在您链接的代码中没有参数。