RxJava — 在 subscribeOn() 线程上发出
RxJava — emit on subscribeOn() thread
我有以下代码:
Single.create { emitter ->
// I/O thread here
ThirdPartySDK.doSomeAction {
// Main thread here
emitter.onSuccess(someValue)
}
}
.flatMap {
someOtherSingle(it) // Executes on main thread
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({},{})
ThirdPartySDK.doSomeAction
回调在主线程上发布,因此发射器也会在主线程上发射,而不是在订阅线程上发射(如果我在 flatMap
中进一步进行一些网络交互,链将失败)。
如果我在第一个 Single
之后添加 observeOn(Schedulers.io())
,它会切换到正确的线程,但是有什么方法可以在正确的线程上发出吗?我无法修改 ThirdPartySDK
行为。
订阅
将在给定的调度程序上调用 subscribeActual lambda
观察
将线程切换到给定的调度程序。每个 upstream-onNext 调用都将从 ObserveOn-Scheduler-Thread
如您所说,subscribeOn 只会在给定的 Scheduler-Thread 上调用 subscribeActual 方法调用。这并不意味着下游发出将在同一个线程上。在你的情况下,onSuccess emit 将从不同的线程调用(例如数据库/ Http-ThreadPool 等)。
onSuccess 将从未知线程调用(在您的情况下是主线程)。下游调用将从主线程调用。因此 flatMap 是从主线程调用的。 flatMap 中主线程上的网络调用可能会失败,因为不允许 "block" 主线程。
如何解决这个问题?
只需在 Single#create 之后放置一个 observeOn。主线程调用 onSucess。 observeOn-subscriber 将从主线程调用。 observeOn-subscriber 将 onSuccess 下游调用(例如 flatMap)重定向到给定的 ObserveOn-Scheduler-Thread。因此给出的是,flatMap 是从非主循环线程调用的。
示例:
@Test
fun wurst() {
val thirdPartySDKImpl = ThirdPartySDKImpl()
Single.create<String> { emitter ->
thirdPartySDKImpl.doSomeAction {
emitter.onSuccess(it)
}
}
// .subscribeOn(Schedulers.computation())
// move emit from unknown thread to computation thread
.observeOn(Schedulers.computation())
// Single.just will be subscribe from a computation thread
.flatMap { Single.just(123) }
// move onSucess/ onError emit from computation thread to main-thread
.observeOn(AndroidSchedulers.mainThread())
// subscribe onNext / onError will be called from the main-android-thread
.subscribe({}, {})
}
interface ThirdPartySDK {
fun doSomeAction(callback: (v: String) -> Unit)
}
class ThirdPartySDKImpl : ThirdPartySDK {
override fun doSomeAction(callback: (v: String) -> Unit) {
// <- impl-detail ->
callback("whatever")
}
}
注意:如果 create-lambda 不阻塞或做一些 cpu 繁重的事情,则不需要 subscribeOn。如果它只订阅一个回调,它将从不同的线程调用,则不需要 subscribeOn。
but is there any way to emit on right thread?
您不应在运算符中使用任何并发。你会想,你可以做这样的事情:
Single.create<String> { emitter ->
thirdPartySDKImpl.doSomeAction {
Schedulers.io().scheduleDirect {
emitter.onSuccess(it)
}
}
}
但不推荐这样做,因为您可能会破坏序列化的 onNext 合同^1。此示例将确保 onSucess 下游调用将在预期线程上发生,但未处理取消/取消订阅,并且可能存在其他陷阱。
如果你有一个非反应性的 API 并且你想强制执行一些线程模型,我建议包装同步。 API 使用异步的并提供适当的 observeOn/subscribeOn 运算符。稍后仅使用异步 API.
interface ThirdPartySDKAsync {
fun doSomeAction(): Single<String>
}
class ThirdPartySDKAsyncImpl(private val sdk: ThirdPartySDK, private val scheduler: Scheduler) :
ThirdPartySDKAsync {
override fun doSomeAction(): Single<String> {
return Single.create<String> { emitter ->
sdk.doSomeAction {
emitter.onSuccess(it)
}
}.observeOn(scheduler)
}
}
进一步阅读:https://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html
^1 一次只允许调用一个线程onNext/onSuccess/onError/onComplete
我有以下代码:
Single.create { emitter ->
// I/O thread here
ThirdPartySDK.doSomeAction {
// Main thread here
emitter.onSuccess(someValue)
}
}
.flatMap {
someOtherSingle(it) // Executes on main thread
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({},{})
ThirdPartySDK.doSomeAction
回调在主线程上发布,因此发射器也会在主线程上发射,而不是在订阅线程上发射(如果我在 flatMap
中进一步进行一些网络交互,链将失败)。
如果我在第一个 Single
之后添加 observeOn(Schedulers.io())
,它会切换到正确的线程,但是有什么方法可以在正确的线程上发出吗?我无法修改 ThirdPartySDK
行为。
订阅
将在给定的调度程序上调用 subscribeActual lambda
观察
将线程切换到给定的调度程序。每个 upstream-onNext 调用都将从 ObserveOn-Scheduler-Thread
如您所说,subscribeOn 只会在给定的 Scheduler-Thread 上调用 subscribeActual 方法调用。这并不意味着下游发出将在同一个线程上。在你的情况下,onSuccess emit 将从不同的线程调用(例如数据库/ Http-ThreadPool 等)。
onSuccess 将从未知线程调用(在您的情况下是主线程)。下游调用将从主线程调用。因此 flatMap 是从主线程调用的。 flatMap 中主线程上的网络调用可能会失败,因为不允许 "block" 主线程。
如何解决这个问题? 只需在 Single#create 之后放置一个 observeOn。主线程调用 onSucess。 observeOn-subscriber 将从主线程调用。 observeOn-subscriber 将 onSuccess 下游调用(例如 flatMap)重定向到给定的 ObserveOn-Scheduler-Thread。因此给出的是,flatMap 是从非主循环线程调用的。
示例:
@Test
fun wurst() {
val thirdPartySDKImpl = ThirdPartySDKImpl()
Single.create<String> { emitter ->
thirdPartySDKImpl.doSomeAction {
emitter.onSuccess(it)
}
}
// .subscribeOn(Schedulers.computation())
// move emit from unknown thread to computation thread
.observeOn(Schedulers.computation())
// Single.just will be subscribe from a computation thread
.flatMap { Single.just(123) }
// move onSucess/ onError emit from computation thread to main-thread
.observeOn(AndroidSchedulers.mainThread())
// subscribe onNext / onError will be called from the main-android-thread
.subscribe({}, {})
}
interface ThirdPartySDK {
fun doSomeAction(callback: (v: String) -> Unit)
}
class ThirdPartySDKImpl : ThirdPartySDK {
override fun doSomeAction(callback: (v: String) -> Unit) {
// <- impl-detail ->
callback("whatever")
}
}
注意:如果 create-lambda 不阻塞或做一些 cpu 繁重的事情,则不需要 subscribeOn。如果它只订阅一个回调,它将从不同的线程调用,则不需要 subscribeOn。
but is there any way to emit on right thread?
您不应在运算符中使用任何并发。你会想,你可以做这样的事情:
Single.create<String> { emitter ->
thirdPartySDKImpl.doSomeAction {
Schedulers.io().scheduleDirect {
emitter.onSuccess(it)
}
}
}
但不推荐这样做,因为您可能会破坏序列化的 onNext 合同^1。此示例将确保 onSucess 下游调用将在预期线程上发生,但未处理取消/取消订阅,并且可能存在其他陷阱。
如果你有一个非反应性的 API 并且你想强制执行一些线程模型,我建议包装同步。 API 使用异步的并提供适当的 observeOn/subscribeOn 运算符。稍后仅使用异步 API.
interface ThirdPartySDKAsync {
fun doSomeAction(): Single<String>
}
class ThirdPartySDKAsyncImpl(private val sdk: ThirdPartySDK, private val scheduler: Scheduler) :
ThirdPartySDKAsync {
override fun doSomeAction(): Single<String> {
return Single.create<String> { emitter ->
sdk.doSomeAction {
emitter.onSuccess(it)
}
}.observeOn(scheduler)
}
}
进一步阅读:https://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html
^1 一次只允许调用一个线程onNext/onSuccess/onError/onComplete