RxJava2:无法使用 retryWhen 处理异步回调的异常

RxJava2: Unable to handle exception for asynchronous callback using retryWhen

我正在尝试连接到 MQTT 代理。我想重试以防连接失败。我收到有关连接成功或失败的回调。

在阅读了多个 retryWhen 示例和处理异步回调后,我将这段代码放在一起。如果我成功连接,它工作正常。此外,如果我从 Flowable 同步调用 e.onError(throwable),它会重试 3 次。但是,如果我从回调的 onFailure() 方法中调用 e.onError(throwable),它会使我的 android 应用程序崩溃。

代码如下:

RxJava chain

createConnectionFlowable(client, options)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .retryWhen(createRetryFunction())
    .subscribe(createConsumer());

create a Flowable

private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<String>() {

        public void subscribe(final FlowableEmitter<String> e) throws Exception {
                client.connect(options).setActionCallback(new IMqttActionListener() {
                    public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
                    public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
                });
        }
    }, BackpressureStrategy.BUFFER);
}

Create a retry function

private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
    return new Function<Flowable<Throwable>, Publisher<?>>() {

        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.zipWith(
                    Flowable.range(1, 3),
                    new BiFunction<Throwable, Integer, Integer>() {
                        public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
                    }
            )
            .flatMap(new Function<Integer, Publisher<?>>() {
                public Publisher<?> apply(Integer integer) throws Exception {
                    return Flowable.timer(integer, TimeUnit.SECONDS);
                }
            });
        }
    };
}

The Consumer: do all the good stuff here

private Consumer<String> createConsumer() {
    return new Consumer<String>() {
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: do important stuff here" + s);
        }
    };
}

Error logs

12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
                                                                     Process: com.work.app, PID: 16769
                                                                     Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: 

问题

  1. 为什么此代码会抛出导致应用程序崩溃的异常?理想情况下,它应该处理异常?我在这里错过了什么?
  2. 为什么不重试3次?
  3. 如果我从 Flowable.subscribe() 方法同步调用 e.onError(throwable),为什么相同的代码重试正确?

参考资料

  1. RxJava 1.x retryWhen doc
  2. This blog
  1. 由于您 subscribe 使用 Consumer<String> 您没有为流定义错误处理程序。这意味着错误将通过 RxJavaPlugins.getErrorHandler().handleError(...) 传递给默认错误处理程序。在 android 上,此处理程序似乎会导致致命错误。要解决此问题,请使用 Observer<String> 而不是 Consumer<String>
  2. 日志似乎表明客户端失败了 3 次("onFailure" 被提及 3 次)在 Rx 之外做任何事情。如果我不得不猜测客户端可能是有状态的,这意味着在初始连接之后对 client.connect(...) 的后续调用表现出导致问题的某种形式的奇怪行为。由于日志显示 error - 1 sec wait - error, error 我猜回调仍然有效,所以第二次失败被发送到 RxJava 两次。
  3. 假设你在谈论同步时谈论 waitForCompletion() 方法,它将支持我在 2 中的假设。由于没有注册回调,每个 throwable 只会被报告一次,修复行为。

我不确定为什么发射器在终止后仍能正常工作 (onError/onComplete),但由于规范要求这些方法仅调用一次,因此可能是导致此问题的未指定行为。

我终于搞定了!

事实证明,这不是 RxJava2 的问题,而是 Mqtt (Eclipse Paho)​​ 在主线程上运行回调 IMqttActionListener 的方式,即使客户端是在 不同的线程上创建的线程!!!.

对此的简单解决方案是等待客户端连接到创建它的线程。题中分享的代码除了这个方法是正确的

@NonNull
public Flowable<Boolean> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final FlowableEmitter<Boolean> e) throws Exception {
            IMqttToken connect = client.connect(options);
            connect.waitForCompletion(); //this is blocking and is what was required!!
            if (client.isConnected()) {
                e.onNext(true);
                e.onComplete();
            } else {
                e.onError(connect.getException());
            }

        }
    }, BackpressureStrategy.BUFFER);
}

希望这对使用这些库的人有所帮助:)