FlatMap 运行 在主线程上甚至使用 Scheuler.io
FlatMap run on main thread even using Scheuler.io
我已经测试过了,肯定是从主线程发出值导致了这个问题。但是,我想知道我是否有这个用例从主线程接收一些值以继续 Rx 流。为了在与 main.
不同的线程中制作 flatMap 运行 应该做什么
class MainActivity : AppCompatActivity() {
private lateinit var emitter: ObservableEmitter<String>
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
btnFlatMap.setOnClickListener {
val obs = Observable.create<String> {
emitter = it
logThread("inside observable")
// TODO: fetch some configuration from the internet or local db
// TODO: then call startActivityForResult()
}
obs
.flatMap {
logThread("flatMap, Banana")
Observable.just("$it, 1 Item")
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ next ->
logThread("onNext")
}, { error ->
logThread("onError")
}, {
logThread("onComplete")
})
}
btnEmitter.setOnClickListener {
// TODO: simulate that onActivityResult is called
emitter.onNext("Banana")
emitter.onComplete()
}
}
private fun logThread(operation: String) {
Log.e("THREAD", "$operation run at [${Thread.currentThread().name}]")
} }
当前Logcat
在 [RxCachedThreadScheduler-1]
中可观察 运行
flatMap,Banana 运行 在 [main]
onNext 运行 在 [main]
onComplete 运行 在 [main]
预计Logcat
在 [RxCachedThreadScheduler-1]
中可观察 运行
flatMap,Banana 运行 在 [RxCachedThreadScheduler-1]
onNext 运行 在 [main]
onComplete 运行 在 [main]
再添加一个 onserverOn(Schedulers.io())
就可以了,告诉 Rx 切换线程回到工作线程
obs
.observeOn(Schedulers.io()) <------ additional
.flatMap {
logThread("flatMap, Banana")
Observable.just("$it, 1 Item")
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ next ->
logThread("onNext")
}, { error ->
logThread("onError")
}, {
logThread("onComplete")
})
因为主线程发出的数据,它导致 RX 在订阅主线程时从 subscribeOn()
注册的工作线程 (Schedulers.io) 切换整个下游。为了再次将整个下游的执行线程更改(切换)到工作线程 observeOn()
为此目的。
简而言之
observeOn
,将下游更改为在特定线程上执行。
subscribeOn
,将上游(根源)设置为在特定线程上执行。
现在Logcat
flatMap,Banana 运行 在 [RxCachedThreadScheduler-1]
onNext 运行 在 [main]
onComplete 运行 在 [main]
我已经测试过了,肯定是从主线程发出值导致了这个问题。但是,我想知道我是否有这个用例从主线程接收一些值以继续 Rx 流。为了在与 main.
不同的线程中制作 flatMap 运行 应该做什么class MainActivity : AppCompatActivity() {
private lateinit var emitter: ObservableEmitter<String>
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
btnFlatMap.setOnClickListener {
val obs = Observable.create<String> {
emitter = it
logThread("inside observable")
// TODO: fetch some configuration from the internet or local db
// TODO: then call startActivityForResult()
}
obs
.flatMap {
logThread("flatMap, Banana")
Observable.just("$it, 1 Item")
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ next ->
logThread("onNext")
}, { error ->
logThread("onError")
}, {
logThread("onComplete")
})
}
btnEmitter.setOnClickListener {
// TODO: simulate that onActivityResult is called
emitter.onNext("Banana")
emitter.onComplete()
}
}
private fun logThread(operation: String) {
Log.e("THREAD", "$operation run at [${Thread.currentThread().name}]")
} }
当前Logcat
在 [RxCachedThreadScheduler-1]
中可观察 运行flatMap,Banana 运行 在 [main]
onNext 运行 在 [main]
onComplete 运行 在 [main]
预计Logcat
在 [RxCachedThreadScheduler-1]
中可观察 运行flatMap,Banana 运行 在 [RxCachedThreadScheduler-1]
onNext 运行 在 [main]
onComplete 运行 在 [main]
再添加一个 onserverOn(Schedulers.io())
就可以了,告诉 Rx 切换线程回到工作线程
obs
.observeOn(Schedulers.io()) <------ additional
.flatMap {
logThread("flatMap, Banana")
Observable.just("$it, 1 Item")
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ next ->
logThread("onNext")
}, { error ->
logThread("onError")
}, {
logThread("onComplete")
})
因为主线程发出的数据,它导致 RX 在订阅主线程时从 subscribeOn()
注册的工作线程 (Schedulers.io) 切换整个下游。为了再次将整个下游的执行线程更改(切换)到工作线程 observeOn()
为此目的。
简而言之
observeOn
,将下游更改为在特定线程上执行。subscribeOn
,将上游(根源)设置为在特定线程上执行。
现在Logcat
flatMap,Banana 运行 在 [RxCachedThreadScheduler-1]
onNext 运行 在 [main]
onComplete 运行 在 [main]