在后台线程上处理 Observable
Process Observable on background thread
我正在使用 RxAndroid 进行流操作。在我的实际用例中,我正在从服务器获取列表(使用 Retrofit)。我正在使用调度程序在后台线程上完成工作,并在 Android UI(主)线程上获得最终发射。
这适用于网络调用,但我意识到网络调用后我的操作员不使用后台线程,而是在主线程上调用。
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> {});
如何确保所有操作都在后台线程上执行?
TL;DR:将 observeOn(AndroidSchedulers.mainThread())
移到 filter(...)
下方。
subscribeOn(...)
用于指定Observable
将开始在哪个线程上运行。对 subscribeOn
的后续调用将被忽略。
因此,如果您要编写以下内容,所有内容 都将在 Schedulers.newThread()
:
上执行
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> { doSomething(integer1); });
现在,当然,这不是你想要的:你想 doSomething
在主线程上。
这就是 observeOn
发挥作用的地方。所有操作 after observeOn
都在该调度程序上执行。因此,在您的示例中,filter
在主线程上执行。
相反,将 observeOn
向下移动到 subscribe
之前:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
现在,filter
将发生在 'new thread' 上,doSomething
将发生在主线程上。
要更进一步,您可以多次使用 observeOn
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
在这种情况下,获取将发生在新线程上,过滤发生在计算线程上,doSomething
发生在主线程上。
查看 ReactiveX - SubscribeOn operator 官方文档。
我正在使用 RxAndroid 进行流操作。在我的实际用例中,我正在从服务器获取列表(使用 Retrofit)。我正在使用调度程序在后台线程上完成工作,并在 Android UI(主)线程上获得最终发射。
这适用于网络调用,但我意识到网络调用后我的操作员不使用后台线程,而是在主线程上调用。
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> {});
如何确保所有操作都在后台线程上执行?
TL;DR:将 observeOn(AndroidSchedulers.mainThread())
移到 filter(...)
下方。
subscribeOn(...)
用于指定Observable
将开始在哪个线程上运行。对 subscribeOn
的后续调用将被忽略。
因此,如果您要编写以下内容,所有内容 都将在 Schedulers.newThread()
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> { doSomething(integer1); });
现在,当然,这不是你想要的:你想 doSomething
在主线程上。
这就是 observeOn
发挥作用的地方。所有操作 after observeOn
都在该调度程序上执行。因此,在您的示例中,filter
在主线程上执行。
相反,将 observeOn
向下移动到 subscribe
之前:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
现在,filter
将发生在 'new thread' 上,doSomething
将发生在主线程上。
要更进一步,您可以多次使用 observeOn
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
在这种情况下,获取将发生在新线程上,过滤发生在计算线程上,doSomething
发生在主线程上。
查看 ReactiveX - SubscribeOn operator 官方文档。