将 timeout() 与 retryWhen() 结合使用
Combining timeout() with retryWhen()
我正在创建一个简单的应用程序,用于使用 RxAndroidBle 库连接蓝牙设备(为出色的工作干杯!)。
我遇到的情况有时是当我连接到设备时收到状态为 133 的 Gatt 错误。
我知道它可能会发生,所以我想做的是在发生该错误时重试所有操作。
这不是问题,我可以使用 retryWhen()
运算符轻松做到这一点,但是我还有另一个要求 - 流必须在之后终止
30 秒(如果连接不成功)。我为此使用 timeout()
,但问题是当我重试时,计时器再次启动。
所以问题是如何将 timeout() 运算符与 retryWhen() 结合使用,这样我就可以重试某些特定错误但保持计数器继续运行。
我有一些关于组合可观察对象的想法,或者一些单独的可观察对象,它们将在超时期限后检查连接状态,但我
想知道我是否可以在单个可观察对象中做到这一点。
到目前为止我的代码如下所示:
public Observable<ConnectingViewState> connectToDevice(String macAddress) {
final RxBleDevice rxBleDevice = rxBleClient.getBleDevice(macAddress);
return rxBleDevice.establishConnection(false)
.subscribeOn(Schedulers.io())
.map(rxBleConnection -> new ConnectingViewState.ConnectedViewState(rxBleConnection))
.cast(ConnectingViewState.class)
.timeout(40, TimeUnit.SECONDS)
.startWith(new ConnectingViewState.ConnectingInProgressViewState())
.retryWhen(errors -> errors.flatMap(error -> {
if (isDefaultGattError(error)) {
return Observable.just(new Object());
} else {
return Observable.error(error);
}
}
))
.onErrorReturn(throwable -> new ConnectingViewState.ErrorState(throwable));
}
正如所讨论的,我已经用 RxJava2 编写了一个测试。代码取自书 'Reactive Programming with RxJava'(第 257 页)
private final static int ATTEMPTS = 10;
@Test
public void name() throws Exception {
Subject<Integer> establishConnection = PublishSubject.create();
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timeout = establishConnection.
retryWhen(failures -> failures
.zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
{
// check here for your error if(...)
if (attempt < ATTEMPTS) {
long expDelay = (long) Math.pow(2, attempt - 2);
return Observable.timer(expDelay, TimeUnit.SECONDS, testScheduler);
} else {
return Observable.error(err);
}
}
)
.flatMap(x -> x))
.timeout(30, TimeUnit.SECONDS, testScheduler)
.onErrorResumeNext(throwable -> {
if (throwable instanceof TimeoutException) {
return Observable.just(42);
}
return Observable.error(throwable);
});
TestObserver<Integer> test = timeout.test();
testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
establishConnection.onError(new IOException("Exception 1"));
testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
establishConnection.onError(new IOException("Exception 2"));
testScheduler.advanceTimeBy(31, TimeUnit.SECONDS);
test.assertValue(42);
}
retryWhen
运算符通过重新订阅其上方的运算符链来工作。因为你把你的 timeout
放在它前面,所以说超时被重新订阅,因此又开始从头开始计数。
将 timeout
放在 之后 retryWhen
应该对整个可重试流应用全局超时。
我正在创建一个简单的应用程序,用于使用 RxAndroidBle 库连接蓝牙设备(为出色的工作干杯!)。
我遇到的情况有时是当我连接到设备时收到状态为 133 的 Gatt 错误。
我知道它可能会发生,所以我想做的是在发生该错误时重试所有操作。
这不是问题,我可以使用 retryWhen()
运算符轻松做到这一点,但是我还有另一个要求 - 流必须在之后终止
30 秒(如果连接不成功)。我为此使用 timeout()
,但问题是当我重试时,计时器再次启动。
所以问题是如何将 timeout() 运算符与 retryWhen() 结合使用,这样我就可以重试某些特定错误但保持计数器继续运行。
我有一些关于组合可观察对象的想法,或者一些单独的可观察对象,它们将在超时期限后检查连接状态,但我 想知道我是否可以在单个可观察对象中做到这一点。
到目前为止我的代码如下所示:
public Observable<ConnectingViewState> connectToDevice(String macAddress) {
final RxBleDevice rxBleDevice = rxBleClient.getBleDevice(macAddress);
return rxBleDevice.establishConnection(false)
.subscribeOn(Schedulers.io())
.map(rxBleConnection -> new ConnectingViewState.ConnectedViewState(rxBleConnection))
.cast(ConnectingViewState.class)
.timeout(40, TimeUnit.SECONDS)
.startWith(new ConnectingViewState.ConnectingInProgressViewState())
.retryWhen(errors -> errors.flatMap(error -> {
if (isDefaultGattError(error)) {
return Observable.just(new Object());
} else {
return Observable.error(error);
}
}
))
.onErrorReturn(throwable -> new ConnectingViewState.ErrorState(throwable));
}
正如所讨论的,我已经用 RxJava2 编写了一个测试。代码取自书 'Reactive Programming with RxJava'(第 257 页)
private final static int ATTEMPTS = 10;
@Test
public void name() throws Exception {
Subject<Integer> establishConnection = PublishSubject.create();
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timeout = establishConnection.
retryWhen(failures -> failures
.zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
{
// check here for your error if(...)
if (attempt < ATTEMPTS) {
long expDelay = (long) Math.pow(2, attempt - 2);
return Observable.timer(expDelay, TimeUnit.SECONDS, testScheduler);
} else {
return Observable.error(err);
}
}
)
.flatMap(x -> x))
.timeout(30, TimeUnit.SECONDS, testScheduler)
.onErrorResumeNext(throwable -> {
if (throwable instanceof TimeoutException) {
return Observable.just(42);
}
return Observable.error(throwable);
});
TestObserver<Integer> test = timeout.test();
testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
establishConnection.onError(new IOException("Exception 1"));
testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
establishConnection.onError(new IOException("Exception 2"));
testScheduler.advanceTimeBy(31, TimeUnit.SECONDS);
test.assertValue(42);
}
retryWhen
运算符通过重新订阅其上方的运算符链来工作。因为你把你的 timeout
放在它前面,所以说超时被重新订阅,因此又开始从头开始计数。
将 timeout
放在 之后 retryWhen
应该对整个可重试流应用全局超时。