RxAndroid ViewObservable NetworkOnMainThreadException
RxAndroid ViewObservable NetworkOnMainThreadException
我有一个 Button
,我从中创建了一个 Observable<OnClickEvent>
。
单击按钮时,我希望从网络中获取文件,但我 运行 遇到有关网络和线程的问题。
这个例子抛出 android.os.NetworkOnMainThreadException
:
Observable<OnClickEvent> networkButtonObservable = ViewObservable.clicks(testNetworkButton);
networkButtonObservable
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
所以我从另一个线程尝试。
以下抛出 rx.exceptions.OnErrorNotImplementedException: Observers must subscribe from the main UI thread, but was Thread[RxNewThreadScheduler-1,5,main]
:
networkButtonObservable
.subscribeOn(Schedulers.newThread())
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
好的..现在我尝试在开始时使用 .debounce()
:
networkButtonObservable
.debounce(10, TimeUnit.MILLISECONDS)
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
这就成功了。
显然我不喜欢在我的代码中添加延迟,所以我试图弄清楚线程方面发生了什么。为什么第一个示例不在后台线程中执行 .map()
内的代码?
或者我在这里遗漏了什么?
---更新
我将我的 TestAPI 更改为 return 一个 Observable,并将对 networkButtonObservable 的第一次调用更改为 .flatMap()
。这也能正常工作。但是我仍然不知道为什么使用 .map()
的原始方法会失败。
networkButtonObservable
.flatMap(new Func1<OnClickEvent, Observable<?>>() {
@Override
public Observable<?> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponseObservable();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
我不是 Android 方面的专家,但根据错误消息,我认为您需要在主线程和后台线程之间反弹值。通常,Android 示例显示您将 subscribeOn
/observeOn
对添加到您的流处理中:
Observable.just(1)
.map(v -> doBackgroundWork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> {});
但在这些情况下,'source' 通常是您可以控制的冷观测值。
在你的问题中,源是一个热点Observable
,有特定的要求,你需要在主线程上订阅,但你需要在后台线程上进行网络调用,然后在主线程。
这种情况下,可以多次使用observeOn
:
networkButtonObservable
.subscribeOn(AndroidSchedulers.mainThread()) // just in case
.observeOn(Schedulers.io())
.map(v -> TestAPI.getTestService().fetchTestResponse())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> updateGUI(v));
我认为 fetchTestResponseObservable
应用了自己的 subscribeOn
或 observeOn
,因此它不会抛出网络异常。
我还想提一下,使用多个 subscribeOn
在功能上等同于只使用一个最接近发射源的线程,但从技术上讲,它会占用未使用的线程资源。但是,在流中使用多个 observeOn
具有相关性,因为您可以有意义地 'pipeline' 线程之间的流处理。
我有一个 Button
,我从中创建了一个 Observable<OnClickEvent>
。
单击按钮时,我希望从网络中获取文件,但我 运行 遇到有关网络和线程的问题。
这个例子抛出 android.os.NetworkOnMainThreadException
:
Observable<OnClickEvent> networkButtonObservable = ViewObservable.clicks(testNetworkButton);
networkButtonObservable
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
所以我从另一个线程尝试。
以下抛出 rx.exceptions.OnErrorNotImplementedException: Observers must subscribe from the main UI thread, but was Thread[RxNewThreadScheduler-1,5,main]
:
networkButtonObservable
.subscribeOn(Schedulers.newThread())
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
好的..现在我尝试在开始时使用 .debounce()
:
networkButtonObservable
.debounce(10, TimeUnit.MILLISECONDS)
.map(new Func1<OnClickEvent, List<String>>() {
@Override
public List<String> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponse();
}
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
这就成功了。
显然我不喜欢在我的代码中添加延迟,所以我试图弄清楚线程方面发生了什么。为什么第一个示例不在后台线程中执行 .map()
内的代码?
或者我在这里遗漏了什么?
---更新
我将我的 TestAPI 更改为 return 一个 Observable,并将对 networkButtonObservable 的第一次调用更改为 .flatMap()
。这也能正常工作。但是我仍然不知道为什么使用 .map()
的原始方法会失败。
networkButtonObservable
.flatMap(new Func1<OnClickEvent, Observable<?>>() {
@Override
public Observable<?> call(OnClickEvent onClickEvent) {
return TestAPI.getTestService().fetchTestResponseObservable();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {Log.w("Final result: " + o);
}
}
);
我不是 Android 方面的专家,但根据错误消息,我认为您需要在主线程和后台线程之间反弹值。通常,Android 示例显示您将 subscribeOn
/observeOn
对添加到您的流处理中:
Observable.just(1)
.map(v -> doBackgroundWork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> {});
但在这些情况下,'source' 通常是您可以控制的冷观测值。
在你的问题中,源是一个热点Observable
,有特定的要求,你需要在主线程上订阅,但你需要在后台线程上进行网络调用,然后在主线程。
这种情况下,可以多次使用observeOn
:
networkButtonObservable
.subscribeOn(AndroidSchedulers.mainThread()) // just in case
.observeOn(Schedulers.io())
.map(v -> TestAPI.getTestService().fetchTestResponse())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> updateGUI(v));
我认为 fetchTestResponseObservable
应用了自己的 subscribeOn
或 observeOn
,因此它不会抛出网络异常。
我还想提一下,使用多个 subscribeOn
在功能上等同于只使用一个最接近发射源的线程,但从技术上讲,它会占用未使用的线程资源。但是,在流中使用多个 observeOn
具有相关性,因为您可以有意义地 'pipeline' 线程之间的流处理。