RxJava 中的指数退避
Exponential Backoff in RxJava
我有一个 API 需要一个 Observable
来触发一个事件。
我想要 return 一个 Observable
如果检测到 Internet 连接则每 defaultDelay
秒发出一个值,如果没有连接则延迟 numberOfFailedAttempts^2
次。
我尝试了很多不同的风格,我遇到的最大问题是 retryWhen's
observable 只被评估一次:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
有什么方法可以做我想做的事吗?我发现了一个相关问题(现在无法找到它),但所采用的方法似乎不适用于动态值。
您可以使用retryWhen
运算符来配置无连接时的延迟。如何定期发出项目是一个单独的主题(查找 interval
或 timer
运算符)。如果您无法弄清楚,请打开一个单独的问题。
我在 Github 上有一个广泛的例子,但我会在这里给你要点。
RetryWithDelay retryWithDelay = RetryWithDelay.builder()
.retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
.build()
Single.fromCallable(() -> {
...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
...
})
RetryWithDelay
定义如下。我用的是 RxJava 2.x,所以如果你用的是 1.x,签名应该是 Func1<Observable<? extends Throwable>, Observable<Object>>
.
public class RetryWithDelay implements
Function<Flowable<? extends Throwable>, Publisher<Object>> {
...
}
RetryWithDelay class.
这让我可以根据 RetryDelayStrategy
配置各种超时,恒定的、线性的、指数的。对于您的用例,您会选择 CONSTANT_DELAY_TIMES_RETRY_COUNT
延迟策略并在构建 RetryWithDelay
.
时调用 retryDelaySeconds(2)
retryWhen
是一个复杂的,甚至可能是错误的运算符。您可以在网上找到的大多数示例都使用 range
运算符,如果没有重试,该运算符将失败。详情请看我的回答。
我一直发现 retryWhen
有点 low-level 所以对于指数退避,我使用了一个构建器(比如 Abhijit),它经过单元测试并且可用于 RxJava 1.x rxjava-extras。我建议使用上限版本,这样延迟的指数增长就不会超过您定义的最大值。
你是这样使用它的:
observable.retryWhen(
RetryWhen.exponentialBackoff(
delay, maxDelay, TimeUNIT.SECONDS)
.build());
我不同意 retryWhen
是错误的,但如果你发现错误,请将其报告给 RxJava。错误修复得很快!
你需要 rxjava-extras 0.8.0.6 或更高版本,它在 Maven Central 上:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.8.0.6</version>
</dependency>
如果您需要 RxJava 2.x 版本,请告诉我。从 0.1.4 开始,rxjava2-extras 中提供了相同的功能。
你的代码有两个错误:
- 为了重复某些可观察序列,该序列必须是有限的。 IE。而不是
interval
你最好使用 just
或 fromCallable
之类的东西,就像我在下面的示例中所做的那样。
- 从
repeatWhen
的内部函数您需要 return 新的延迟可观察源,因此您必须 return Observable.timer()
而不是 observable.delay()
。
工作代码:
public void testRepeat() throws InterruptedException {
logger.info("test start");
int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}", v));
Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}
查看有关 repeatWhen
here 的详细文章。
我有一个 API 需要一个 Observable
来触发一个事件。
我想要 return 一个 Observable
如果检测到 Internet 连接则每 defaultDelay
秒发出一个值,如果没有连接则延迟 numberOfFailedAttempts^2
次。
我尝试了很多不同的风格,我遇到的最大问题是 retryWhen's
observable 只被评估一次:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
有什么方法可以做我想做的事吗?我发现了一个相关问题(现在无法找到它),但所采用的方法似乎不适用于动态值。
您可以使用retryWhen
运算符来配置无连接时的延迟。如何定期发出项目是一个单独的主题(查找 interval
或 timer
运算符)。如果您无法弄清楚,请打开一个单独的问题。
我在 Github 上有一个广泛的例子,但我会在这里给你要点。
RetryWithDelay retryWithDelay = RetryWithDelay.builder()
.retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
.build()
Single.fromCallable(() -> {
...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
...
})
RetryWithDelay
定义如下。我用的是 RxJava 2.x,所以如果你用的是 1.x,签名应该是 Func1<Observable<? extends Throwable>, Observable<Object>>
.
public class RetryWithDelay implements
Function<Flowable<? extends Throwable>, Publisher<Object>> {
...
}
RetryWithDelay class.
这让我可以根据 RetryDelayStrategy
配置各种超时,恒定的、线性的、指数的。对于您的用例,您会选择 CONSTANT_DELAY_TIMES_RETRY_COUNT
延迟策略并在构建 RetryWithDelay
.
retryDelaySeconds(2)
retryWhen
是一个复杂的,甚至可能是错误的运算符。您可以在网上找到的大多数示例都使用 range
运算符,如果没有重试,该运算符将失败。详情请看我的回答
我一直发现 retryWhen
有点 low-level 所以对于指数退避,我使用了一个构建器(比如 Abhijit),它经过单元测试并且可用于 RxJava 1.x rxjava-extras。我建议使用上限版本,这样延迟的指数增长就不会超过您定义的最大值。
你是这样使用它的:
observable.retryWhen(
RetryWhen.exponentialBackoff(
delay, maxDelay, TimeUNIT.SECONDS)
.build());
我不同意 retryWhen
是错误的,但如果你发现错误,请将其报告给 RxJava。错误修复得很快!
你需要 rxjava-extras 0.8.0.6 或更高版本,它在 Maven Central 上:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.8.0.6</version>
</dependency>
如果您需要 RxJava 2.x 版本,请告诉我。从 0.1.4 开始,rxjava2-extras 中提供了相同的功能。
你的代码有两个错误:
- 为了重复某些可观察序列,该序列必须是有限的。 IE。而不是
interval
你最好使用just
或fromCallable
之类的东西,就像我在下面的示例中所做的那样。 - 从
repeatWhen
的内部函数您需要 return 新的延迟可观察源,因此您必须 returnObservable.timer()
而不是observable.delay()
。
工作代码:
public void testRepeat() throws InterruptedException {
logger.info("test start");
int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}", v));
Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}
查看有关 repeatWhen
here 的详细文章。