如何用rxjava2取消Observable.timer?

How to cancel Observable.timer with rxjava2?

我的代码中有一个重试机制,我使用以下行来执行我的重试逻辑。例如,我生成一个随机毫秒来延迟我的执行。当计时器滴答到 30 * 1000 毫秒时,我想取消这个计时器。我怎样才能取消这个计时器并立即执行我的逻辑。

//register retryWhen
Observable.retryWhen(new RetryWhenException());

//retry code.
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>>{       
public Observable<?> apply(final Observable<? extends Throwable> observable) throws Exception {
            return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Function<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> apply(Wrapper wrapper) throws Exception {
                    long delay = 60 * 1000;
                    //How can I add some code here to cancel this time and execute this api call immediately if I receive a event like network gets back?
                    return Observable.timer(delay, TimeUnit.MILLISECONDS);
                }
            });
        }
}

提前致谢。

如果我对你的问题的理解正确,你希望能够将多个条件组合到重试中,这意味着重试最多会在一定时间后发生(计时器),或者甚至在某些事件发生后更早(网络)已连接)。
在这种情况下,您需要结合这两个事件,首先,您需要一些 Observable 来通知网络事件(关于如何创建它的讨论不同,用它来包装系统广播事件应该不成问题Observable),那么你可以这样做:

private Observable<NetworkState> networkStateObservable;

public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>> {
    public Observable<?> apply(
            final Observable<? extends Throwable> observable) throws Exception {
        return observable.zipWith(Observable.range(1, count + 1), Wrapper::new)
                .flatMap(wrapper -> {
                    long delay = 60 * 1000;
                    Observable<NetworkState> networkConnectedEvents =
                            networkStateObservable.filter(networkState -> networkState.isConnected())
                               take(1);
                    Observable<Long> timer = Observable.timer(delay, TimeUnit.MILLISECONDS);

                    return Observable.amb(Arrays.asList(networkConnectedEvents, timer));
                });
    }
}

网络状态 Observable 被过滤为仅在连接时收到通知,take(1) 也是为了确保在第一次收到通知后我们将取消订阅(不再需要收听) ).
amb() 运算符似乎非常适合这里,因为它会选择将首先发出的 Observable,并取消订阅另一个,这意味着如果网络 Observable 在计时器之前发出,则计时器Observable 将取消订阅(= 计时器将被取消)。

编辑:

删除了错误的 takeUntil(networkConnectedEvents),不需要它,因为 amb 会在需要时取消订阅。

非常感谢您的回复。这正是我要找的。我按照你的描述创建了一个网络 Observable 包装器, 在调用 onNetworkChanged() 之后,networkStateObservable 似乎没有发出(state.isConnected() 为真)。 如何使 networkStateObservable 正确发出?非常感谢。 我在

中调用了以下行
e.onNext(state);  
 e.onComplete();

我发射 networkStateObservable 是不是错了?

Observable<NetworkState> networkStateObservable = getNetworkObservableWrapper();
public Observable<NetworkState> getNetworkObservableWrapper() {
        return Observable.create(new ObservableOnSubscribe<NetworkState>() {
            @Override
            public void subscribe(final ObservableEmitter<NetworkState> e) throws Exception {
                final NetworkChangedListener listener = new NetworkChangedListener() {
                    @Override
                    public void onNetworkChanged(NetworkState state) {
                        //Below lines were called, but Observable.amb(Arrays.asList(networkConnectedEvents, timer)); didn't work.                       
                        e.onNext(state);
                        e.onComplete();
                    }
                };
                registerNetworkChanged(listener);
            }
        });
    }