如何用rxjava2取消Observable.timer?
How to cancel Observable.timer with rxjava2?
我的代码中有一个重试机制,我使用以下行来执行我的重试逻辑。例如,我生成一个随机毫秒来延迟我的执行。当计时器滴答到 30 * 1000 毫秒时,我想取消这个计时器。我怎样才能取消这个计时器并立即执行我的逻辑。
//register retryWhen
Observable.retryWhen(new RetryWhenException());
//retry code.
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>>{
public Observable<?> apply(final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(new Function<Wrapper, Observable<?>>() {
@Override
public Observable<?> apply(Wrapper wrapper) throws Exception {
long delay = 60 * 1000;
//How can I add some code here to cancel this time and execute this api call immediately if I receive a event like network gets back?
return Observable.timer(delay, TimeUnit.MILLISECONDS);
}
});
}
}
提前致谢。
如果我对你的问题的理解正确,你希望能够将多个条件组合到重试中,这意味着重试最多会在一定时间后发生(计时器),或者甚至在某些事件发生后更早(网络)已连接)。
在这种情况下,您需要结合这两个事件,首先,您需要一些 Observable
来通知网络事件(关于如何创建它的讨论不同,用它来包装系统广播事件应该不成问题Observable),那么你可以这样做:
private Observable<NetworkState> networkStateObservable;
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>> {
public Observable<?> apply(
final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), Wrapper::new)
.flatMap(wrapper -> {
long delay = 60 * 1000;
Observable<NetworkState> networkConnectedEvents =
networkStateObservable.filter(networkState -> networkState.isConnected())
take(1);
Observable<Long> timer = Observable.timer(delay, TimeUnit.MILLISECONDS);
return Observable.amb(Arrays.asList(networkConnectedEvents, timer));
});
}
}
网络状态 Observable
被过滤为仅在连接时收到通知,take(1)
也是为了确保在第一次收到通知后我们将取消订阅(不再需要收听) ).
amb()
运算符似乎非常适合这里,因为它会选择将首先发出的 Observable
,并取消订阅另一个,这意味着如果网络 Observable
在计时器之前发出,则计时器Observable
将取消订阅(= 计时器将被取消)。
编辑:
删除了错误的 takeUntil(networkConnectedEvents),不需要它,因为 amb 会在需要时取消订阅。
非常感谢您的回复。这正是我要找的。我按照你的描述创建了一个网络 Observable 包装器,
在调用 onNetworkChanged() 之后,networkStateObservable 似乎没有发出(state.isConnected() 为真)。
如何使 networkStateObservable 正确发出?非常感谢。
我在
中调用了以下行
e.onNext(state);
e.onComplete();
我发射 networkStateObservable 是不是错了?
Observable<NetworkState> networkStateObservable = getNetworkObservableWrapper();
public Observable<NetworkState> getNetworkObservableWrapper() {
return Observable.create(new ObservableOnSubscribe<NetworkState>() {
@Override
public void subscribe(final ObservableEmitter<NetworkState> e) throws Exception {
final NetworkChangedListener listener = new NetworkChangedListener() {
@Override
public void onNetworkChanged(NetworkState state) {
//Below lines were called, but Observable.amb(Arrays.asList(networkConnectedEvents, timer)); didn't work.
e.onNext(state);
e.onComplete();
}
};
registerNetworkChanged(listener);
}
});
}
我的代码中有一个重试机制,我使用以下行来执行我的重试逻辑。例如,我生成一个随机毫秒来延迟我的执行。当计时器滴答到 30 * 1000 毫秒时,我想取消这个计时器。我怎样才能取消这个计时器并立即执行我的逻辑。
//register retryWhen
Observable.retryWhen(new RetryWhenException());
//retry code.
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>>{
public Observable<?> apply(final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(new Function<Wrapper, Observable<?>>() {
@Override
public Observable<?> apply(Wrapper wrapper) throws Exception {
long delay = 60 * 1000;
//How can I add some code here to cancel this time and execute this api call immediately if I receive a event like network gets back?
return Observable.timer(delay, TimeUnit.MILLISECONDS);
}
});
}
}
提前致谢。
如果我对你的问题的理解正确,你希望能够将多个条件组合到重试中,这意味着重试最多会在一定时间后发生(计时器),或者甚至在某些事件发生后更早(网络)已连接)。
在这种情况下,您需要结合这两个事件,首先,您需要一些 Observable
来通知网络事件(关于如何创建它的讨论不同,用它来包装系统广播事件应该不成问题Observable),那么你可以这样做:
private Observable<NetworkState> networkStateObservable;
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>> {
public Observable<?> apply(
final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), Wrapper::new)
.flatMap(wrapper -> {
long delay = 60 * 1000;
Observable<NetworkState> networkConnectedEvents =
networkStateObservable.filter(networkState -> networkState.isConnected())
take(1);
Observable<Long> timer = Observable.timer(delay, TimeUnit.MILLISECONDS);
return Observable.amb(Arrays.asList(networkConnectedEvents, timer));
});
}
}
网络状态 Observable
被过滤为仅在连接时收到通知,take(1)
也是为了确保在第一次收到通知后我们将取消订阅(不再需要收听) ).
amb()
运算符似乎非常适合这里,因为它会选择将首先发出的 Observable
,并取消订阅另一个,这意味着如果网络 Observable
在计时器之前发出,则计时器Observable
将取消订阅(= 计时器将被取消)。
编辑:
删除了错误的 takeUntil(networkConnectedEvents),不需要它,因为 amb 会在需要时取消订阅。
非常感谢您的回复。这正是我要找的。我按照你的描述创建了一个网络 Observable 包装器, 在调用 onNetworkChanged() 之后,networkStateObservable 似乎没有发出(state.isConnected() 为真)。 如何使 networkStateObservable 正确发出?非常感谢。 我在
中调用了以下行e.onNext(state);
e.onComplete();
我发射 networkStateObservable 是不是错了?
Observable<NetworkState> networkStateObservable = getNetworkObservableWrapper();
public Observable<NetworkState> getNetworkObservableWrapper() {
return Observable.create(new ObservableOnSubscribe<NetworkState>() {
@Override
public void subscribe(final ObservableEmitter<NetworkState> e) throws Exception {
final NetworkChangedListener listener = new NetworkChangedListener() {
@Override
public void onNetworkChanged(NetworkState state) {
//Below lines were called, but Observable.amb(Arrays.asList(networkConnectedEvents, timer)); didn't work.
e.onNext(state);
e.onComplete();
}
};
registerNetworkChanged(listener);
}
});
}