Rx2 java,消费者接口不处理错误(套接字超时)。应用程序崩溃

Rx2 java, Consumer interface does not handle error (socket timeout). App crashes

我有以下订户用于注册来自 http 服务提供商的接受,但是当 url 格式错误时,我得到如下所示的无法捕获的异常,即 try-catch 不起作用。 (当 url 有效时,没问题)。

如何让这个防水?我想接收 "onError" 但这不是消费者界面的一部分。目前,应用程序因该错误事件而崩溃。也许是better/simpler直接使用Http,而不是RX?

try {
            someApi.setStationInfo(stationInfo)
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<StationInfo>() {
                        @Override
                        public void accept(StationInfo abi) throws Exception {

                                                            System.out.println("TestAppZappPc received accept from endpoint, data: " + abi);
                        }
                    });
        } catch (Exception e) {
            e.printStackTrace();
        }

例外情况:

io.reactivex.exceptions.OnErrorNotImplementedException: 连接超时 在 io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) 在 io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) 在 io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77) 在 io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.checkTerminated(ObservableObserveOn.java:276) 在 io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:172) 在 io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:252) 在 io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) 在 io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) 在 java.util.concurrent.FutureTask.run(FutureTask.java:237) 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) 在 java.lang.Thread.run(Thread.java:761) 原因:java.net.SocketTimeoutException:连接超时 在 java.net.PlainSocketImpl.socketConnect(本机方法) 在 java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:334) 在 java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:196) 在 java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178) 在 java.net.SocksSocketImpl.connect(SocksSocketImpl.java:356) 在 java.net.Socket.connect(Socket.java:586) 在 okhttp3.internal.platform.AndroidPlatform.connectSocket(AndroidPlatform.java:71) 在 okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:240) 在 okhttp3.internal.connection.RealConnection.connect(RealConnection.java:160) 在 okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:257) 在 okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) 在 okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114) 在 okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 在 okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 在 okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 在 okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 在 okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:213) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 在 okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 在 okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200) 在 okhttp3.RealCall.execute(RealCall.java:77) 在 retrofit2.OkHttpCall.execute(OkHttpCall.java:174) 在 com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41) 在 io.reactivex.Observable.subscribe(Observable.java:10955) 在 com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(正文Observable.java:34) 2019-05-14 11:41:44.728 31475-32204/com.hdsl.a.zapp E/AndroidRuntime: 在 io.reactivex.Observable.subscribe(Observable.java:10955) 在 io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 在 io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) ... 还有 7 个

有两种不同的处理方式。

  1. 使用 subscribe 的 2 参数版本:subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)。第二个Consumer会在异常发生时被调用
  2. 内联处理:
someApi.setStationInfo(stationInfo)
       .subscribeOn(Schedulers.io())
       .observeOn(Schedulers.newThread())
       .toMaybe()
       .onErrorComplete(any -> true)
       .subscribe(abi ->                                                             
            System.out.println("TestAppZappPc received accept from endpoint, data: "
                 + abi)
       );