订阅时出现 RxJava NetworkOnMainThreadException

RxJava NetworkOnMainThreadException while subscribing

我正在尝试调用 API 并将数据添加到我的 LiveData,但在我订阅之后,我在 OnError 中收到 MainThreadException。尝试了不同的调度程序,但没有成功。从 onError 添加堆栈跟踪。

private void pullLocation(){

    myLocationService.getLocation()
            .flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
                @Override
                public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
                    return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key");
                }
            })

            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(new Subscriber<DistanceResponseModel>() {
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe in DistanceViewModel called. ");
        }

        @Override
        public void onNext(DistanceResponseModel distanceResponseModel) {
            distanceLiveData.postValue(distanceResponseModel);
            Log.d(TAG, "onNext in DistanceViewModel called. ");
        }

        @Override
        public void onError(Throwable t) {
            Log.e("YOUR_APP_LOG_TAG", "I got an error:", t);
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete in DistanceViewModel called. ");
        }
    });
}

编辑:

添加了错误代码的详细堆栈跟踪:

E/YOUR_APP_LOG_TAG: I got an error
    android.os.NetworkOnMainThreadException
        at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:102)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:90)
        at java.net.InetAddress.getAllByName(InetAddress.java:787)
        at okhttp3.Dns.lambda$static[=11=](Dns.java:39)
        at okhttp3.-$$Lambda$DnsevC3uO-H_z08sS9O-4-hLhZ8es.lookup(Unknown Source:0)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:135)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
        at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:187)
        at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
        at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
        at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
        at okhttp3.RealCall.execute(RealCall.java:81)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
        at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
        at io.reactivex.Observable.subscribe(Observable.java:12030)
        at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12030)
        at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
        at io.reactivex.Single.subscribe(Single.java:3394)
        at io.reactivex.internal.operators.flowable.FlowableFlatMapSingle$FlatMapSingleSubscriber.onNext(FlowableFlatMapSingle.java:132)
        at io.reactivex.internal.operators.flowable.FlowableHide$HideSubscriber.onNext(FlowableHide.java:68)
        at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
        at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
        at com.example.compass.viewModels.MyLocationServiceClass.onLocationChanged(MyLocationServiceClass.java:35)
        at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
        at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
        at android.location.LocationManager$ListenerTransport.handleMessage(LocationManager.java:237)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:164)
        at android.app.ActivityThread.main(ActivityThread.java:6494)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)

这是我获取位置的地方:

public class MyLocationServiceClass implements MyLocationService {

    PublishProcessor<Location> stream = PublishProcessor.create();
    LocationListener listener = new LocationListener() {
        @Override
        public void onLocationChanged(@NonNull Location location) {
            stream.onNext(location);
            Log.d(TAG, "onLocationChanged: stream.onNext(location) called");
        }

        @Override
        public void onProviderEnabled(@NonNull String provider) {

        }

        @Override
        public void onProviderDisabled(@NonNull String provider) {

        }

        @Override
        public void onStatusChanged(String provider, int status, Bundle extras) {

        }
    };

    private LocationManager locationManager;
    private boolean flag;
    private Context context;

    public MyLocationServiceClass(Context context, LocationManager locationManager, boolean flag) {
        this.context = context;
        this.locationManager = locationManager;
        this.flag = flag;

    }

    @OnLifecycleEvent(Lifecycle.Event.ON_START)
    void onStart() {
        if (flag) {
            start();
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_STOP)
    void onStop() {
        locationManager.removeUpdates(listener);
    }


    @Override
    public Flowable<Location> getLocation() {
        return stream.hide();
    }

    @Override
    public void start() {
        long minTimeMs = 10000;
        float minDistanceM = 5.5F;


        if (ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) != PackageManager.PERMISSION_GRANTED && ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_COARSE_LOCATION) != PackageManager.PERMISSION_GRANTED) {

            return;
        }
        Log.d(TAG, "start(): requestLocationUpdates called.");
        locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener);

    }

    @Override
    public void updatePermission(boolean newState) {
        flag = newState;
    }

public interface MyLocationService extends LifecycleObserver {

    Flowable<Location> getLocation();

    void start();
    void updatePermission(boolean newState);
}

知道会是什么吗?如果您需要更多详细信息,请告诉我。希望这会有所帮助。

完整代码的回购:https://github.com/LightingTT/Compass

问题

当调用 distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key").

时,一个 onError (NetworkOnMainThreadException) 被传播到订阅者

为什么抛出异常?

禁止在 UI 上进行网络请求-Eventloop,以免阻塞 UI。

为什么 subscribeOn 没有帮助?

subscribeOn 调用给定调度程序上游运算符的 subscribeActual 方法。在您的情况下,订阅 flatMapSingle 运算符是从 IO-Scheduler 中的工作线程调用的。 flatMapSingle 在同一线程上调用 myLocationService.getLocation()

正如您在堆栈跟踪中看到的那样,从 getLocation 返回的来自 source-observable 的 onNext 是从 UI-thread

发出的
    at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
    at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
    at com.example.compass.viewModels.MyLocationServiceClass.onLocationChanged(MyLocationServiceClass.java:35)
    at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
    at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
    at android.location.LocationManager$ListenerTransport.handleMessage(LocationManager.java:237)
    at android.os.Handler.dispatchMessage(Handler.java:106)
    at android.os.Looper.loop(Looper.java:164)

该值是通过调用线程(UI-线程)上的 onNext 发出的,因为回调是从 UI-线程调用的。 flatMapSingle 运算符也在调用 UI 线程上被调用。它订阅从 lambda 返回的 Single 并在调用线程上订阅它。因此,UI-thread 调用 UI-thread 上的网络请求,Android 运行时抛出异常。 SubscribeOn 不能确定 onNext 在给定的 Scheduler 上是下游。它只确保订阅发生在这个线程中。在你的例子中,你的 source-observable 将收到 UI-Thread.Android runtime 的通知。

解决方案?

myLocationService.getLocation() 之后立即使用 observeOn 以便将线程从一个操作员切换到另一个操作员。 observeOn 确保 onNext 调用下游发生在给定的调度程序上。因此,对 flatMapSingle 中内部流的订阅将发生在给定的调度程序工作线程上,而不是 ui 线程上。您还可以在 flatMapSingle 中的 Single 上应用 subscribeOn。这将确保网络调用将发生在给定的调度程序工作线程上。

        .flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
            @Override
            public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
                return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
                                         .subscribeOn(Schedulers.io());
            }
        })

进一步阅读

http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html?m=1