订阅时出现 RxJava NetworkOnMainThreadException
RxJava NetworkOnMainThreadException while subscribing
我正在尝试调用 API 并将数据添加到我的 LiveData,但在我订阅之后,我在 OnError 中收到 MainThreadException。尝试了不同的调度程序,但没有成功。从 onError 添加堆栈跟踪。
private void pullLocation(){
myLocationService.getLocation()
.flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
@Override
public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key");
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Subscriber<DistanceResponseModel>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe in DistanceViewModel called. ");
}
@Override
public void onNext(DistanceResponseModel distanceResponseModel) {
distanceLiveData.postValue(distanceResponseModel);
Log.d(TAG, "onNext in DistanceViewModel called. ");
}
@Override
public void onError(Throwable t) {
Log.e("YOUR_APP_LOG_TAG", "I got an error:", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete in DistanceViewModel called. ");
}
});
}
编辑:
添加了错误代码的详细堆栈跟踪:
E/YOUR_APP_LOG_TAG: I got an error
android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:102)
at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:90)
at java.net.InetAddress.getAllByName(InetAddress.java:787)
at okhttp3.Dns.lambda$static[=11=](Dns.java:39)
at okhttp3.-$$Lambda$DnsevC3uO-H_z08sS9O-4-hLhZ8es.lookup(Unknown Source:0)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:135)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:187)
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
at okhttp3.RealCall.execute(RealCall.java:81)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:35)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
at io.reactivex.Single.subscribe(Single.java:3394)
at io.reactivex.internal.operators.flowable.FlowableFlatMapSingle$FlatMapSingleSubscriber.onNext(FlowableFlatMapSingle.java:132)
at io.reactivex.internal.operators.flowable.FlowableHide$HideSubscriber.onNext(FlowableHide.java:68)
at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
at com.example.compass.viewModels.MyLocationServiceClass.onLocationChanged(MyLocationServiceClass.java:35)
at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
at android.location.LocationManager$ListenerTransport.handleMessage(LocationManager.java:237)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:164)
at android.app.ActivityThread.main(ActivityThread.java:6494)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)
这是我获取位置的地方:
public class MyLocationServiceClass implements MyLocationService {
PublishProcessor<Location> stream = PublishProcessor.create();
LocationListener listener = new LocationListener() {
@Override
public void onLocationChanged(@NonNull Location location) {
stream.onNext(location);
Log.d(TAG, "onLocationChanged: stream.onNext(location) called");
}
@Override
public void onProviderEnabled(@NonNull String provider) {
}
@Override
public void onProviderDisabled(@NonNull String provider) {
}
@Override
public void onStatusChanged(String provider, int status, Bundle extras) {
}
};
private LocationManager locationManager;
private boolean flag;
private Context context;
public MyLocationServiceClass(Context context, LocationManager locationManager, boolean flag) {
this.context = context;
this.locationManager = locationManager;
this.flag = flag;
}
@OnLifecycleEvent(Lifecycle.Event.ON_START)
void onStart() {
if (flag) {
start();
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_STOP)
void onStop() {
locationManager.removeUpdates(listener);
}
@Override
public Flowable<Location> getLocation() {
return stream.hide();
}
@Override
public void start() {
long minTimeMs = 10000;
float minDistanceM = 5.5F;
if (ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) != PackageManager.PERMISSION_GRANTED && ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_COARSE_LOCATION) != PackageManager.PERMISSION_GRANTED) {
return;
}
Log.d(TAG, "start(): requestLocationUpdates called.");
locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener);
}
@Override
public void updatePermission(boolean newState) {
flag = newState;
}
public interface MyLocationService extends LifecycleObserver {
Flowable<Location> getLocation();
void start();
void updatePermission(boolean newState);
}
知道会是什么吗?如果您需要更多详细信息,请告诉我。希望这会有所帮助。
问题
当调用 distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
.
时,一个 onError (NetworkOnMainThreadException) 被传播到订阅者
为什么抛出异常?
禁止在 UI 上进行网络请求-Eventloop,以免阻塞 UI。
为什么 subscribeOn 没有帮助?
subscribeOn
调用给定调度程序上游运算符的 subscribeActual 方法。在您的情况下,订阅 flatMapSingle
运算符是从 IO-Scheduler 中的工作线程调用的。 flatMapSingle
在同一线程上调用 myLocationService.getLocation()
。
正如您在堆栈跟踪中看到的那样,从 getLocation
返回的来自 source-observable 的 onNext 是从 UI-thread
发出的
at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
at com.example.compass.viewModels.MyLocationServiceClass.onLocationChanged(MyLocationServiceClass.java:35)
at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
at android.location.LocationManager$ListenerTransport.handleMessage(LocationManager.java:237)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:164)
该值是通过调用线程(UI-线程)上的 onNext 发出的,因为回调是从 UI-线程调用的。 flatMapSingle
运算符也在调用 UI 线程上被调用。它订阅从 lambda 返回的 Single
并在调用线程上订阅它。因此,UI-thread 调用 UI-thread 上的网络请求,Android 运行时抛出异常。 SubscribeOn
不能确定 onNext 在给定的 Scheduler
上是下游。它只确保订阅发生在这个线程中。在你的例子中,你的 source-observable 将收到 UI-Thread.Android runtime 的通知。
解决方案?
在 myLocationService.getLocation()
之后立即使用 observeOn
以便将线程从一个操作员切换到另一个操作员。 observeOn
确保 onNext
调用下游发生在给定的调度程序上。因此,对 flatMapSingle
中内部流的订阅将发生在给定的调度程序工作线程上,而不是 ui 线程上。您还可以在 flatMapSingle
中的 Single
上应用 subscribeOn
。这将确保网络调用将发生在给定的调度程序工作线程上。
.flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
@Override
public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
.subscribeOn(Schedulers.io());
}
})
进一步阅读
http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html?m=1
我正在尝试调用 API 并将数据添加到我的 LiveData,但在我订阅之后,我在 OnError 中收到 MainThreadException。尝试了不同的调度程序,但没有成功。从 onError 添加堆栈跟踪。
private void pullLocation(){
myLocationService.getLocation()
.flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
@Override
public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key");
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Subscriber<DistanceResponseModel>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe in DistanceViewModel called. ");
}
@Override
public void onNext(DistanceResponseModel distanceResponseModel) {
distanceLiveData.postValue(distanceResponseModel);
Log.d(TAG, "onNext in DistanceViewModel called. ");
}
@Override
public void onError(Throwable t) {
Log.e("YOUR_APP_LOG_TAG", "I got an error:", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete in DistanceViewModel called. ");
}
});
}
编辑:
添加了错误代码的详细堆栈跟踪:
E/YOUR_APP_LOG_TAG: I got an error
android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:102)
at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:90)
at java.net.InetAddress.getAllByName(InetAddress.java:787)
at okhttp3.Dns.lambda$static[=11=](Dns.java:39)
at okhttp3.-$$Lambda$DnsevC3uO-H_z08sS9O-4-hLhZ8es.lookup(Unknown Source:0)
at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:135)
at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:187)
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
at okhttp3.RealCall.execute(RealCall.java:81)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:35)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
at io.reactivex.Single.subscribe(Single.java:3394)
at io.reactivex.internal.operators.flowable.FlowableFlatMapSingle$FlatMapSingleSubscriber.onNext(FlowableFlatMapSingle.java:132)
at io.reactivex.internal.operators.flowable.FlowableHide$HideSubscriber.onNext(FlowableHide.java:68)
at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
at com.example.compass.viewModels.MyLocationServiceClass.onLocationChanged(MyLocationServiceClass.java:35)
at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
at android.location.LocationManager$ListenerTransport.handleMessage(LocationManager.java:237)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:164)
at android.app.ActivityThread.main(ActivityThread.java:6494)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)
这是我获取位置的地方:
public class MyLocationServiceClass implements MyLocationService {
PublishProcessor<Location> stream = PublishProcessor.create();
LocationListener listener = new LocationListener() {
@Override
public void onLocationChanged(@NonNull Location location) {
stream.onNext(location);
Log.d(TAG, "onLocationChanged: stream.onNext(location) called");
}
@Override
public void onProviderEnabled(@NonNull String provider) {
}
@Override
public void onProviderDisabled(@NonNull String provider) {
}
@Override
public void onStatusChanged(String provider, int status, Bundle extras) {
}
};
private LocationManager locationManager;
private boolean flag;
private Context context;
public MyLocationServiceClass(Context context, LocationManager locationManager, boolean flag) {
this.context = context;
this.locationManager = locationManager;
this.flag = flag;
}
@OnLifecycleEvent(Lifecycle.Event.ON_START)
void onStart() {
if (flag) {
start();
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_STOP)
void onStop() {
locationManager.removeUpdates(listener);
}
@Override
public Flowable<Location> getLocation() {
return stream.hide();
}
@Override
public void start() {
long minTimeMs = 10000;
float minDistanceM = 5.5F;
if (ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) != PackageManager.PERMISSION_GRANTED && ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_COARSE_LOCATION) != PackageManager.PERMISSION_GRANTED) {
return;
}
Log.d(TAG, "start(): requestLocationUpdates called.");
locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener);
}
@Override
public void updatePermission(boolean newState) {
flag = newState;
}
public interface MyLocationService extends LifecycleObserver {
Flowable<Location> getLocation();
void start();
void updatePermission(boolean newState);
}
知道会是什么吗?如果您需要更多详细信息,请告诉我。希望这会有所帮助。
问题
当调用 distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
.
为什么抛出异常?
禁止在 UI 上进行网络请求-Eventloop,以免阻塞 UI。
为什么 subscribeOn 没有帮助?
subscribeOn
调用给定调度程序上游运算符的 subscribeActual 方法。在您的情况下,订阅 flatMapSingle
运算符是从 IO-Scheduler 中的工作线程调用的。 flatMapSingle
在同一线程上调用 myLocationService.getLocation()
。
正如您在堆栈跟踪中看到的那样,从 getLocation
返回的来自 source-observable 的 onNext 是从 UI-thread
at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
at com.example.compass.viewModels.MyLocationServiceClass.onLocationChanged(MyLocationServiceClass.java:35)
at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
at android.location.LocationManager$ListenerTransport.handleMessage(LocationManager.java:237)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:164)
该值是通过调用线程(UI-线程)上的 onNext 发出的,因为回调是从 UI-线程调用的。 flatMapSingle
运算符也在调用 UI 线程上被调用。它订阅从 lambda 返回的 Single
并在调用线程上订阅它。因此,UI-thread 调用 UI-thread 上的网络请求,Android 运行时抛出异常。 SubscribeOn
不能确定 onNext 在给定的 Scheduler
上是下游。它只确保订阅发生在这个线程中。在你的例子中,你的 source-observable 将收到 UI-Thread.Android runtime 的通知。
解决方案?
在 myLocationService.getLocation()
之后立即使用 observeOn
以便将线程从一个操作员切换到另一个操作员。 observeOn
确保 onNext
调用下游发生在给定的调度程序上。因此,对 flatMapSingle
中内部流的订阅将发生在给定的调度程序工作线程上,而不是 ui 线程上。您还可以在 flatMapSingle
中的 Single
上应用 subscribeOn
。这将确保网络调用将发生在给定的调度程序工作线程上。
.flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
@Override
public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
.subscribeOn(Schedulers.io());
}
})
进一步阅读
http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html?m=1