FlatMap 运行 在主线程上甚至使用 Scheuler.io

FlatMap run on main thread even using Scheuler.io

我已经测试过了,肯定是从主线程发出值导致了这个问题。但是,我想知道我是否有这个用例从主线程接收一些值以继续 Rx 流。为了在与 main.

不同的线程中制作 flatMap 运行 应该做什么
class MainActivity : AppCompatActivity() {

    private lateinit var emitter: ObservableEmitter<String>

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        btnFlatMap.setOnClickListener {

            val obs = Observable.create<String> {
                emitter = it
                logThread("inside observable")

                // TODO: fetch some configuration from the internet or local db

                // TODO: then call startActivityForResult()
            }

            obs
                .flatMap {
                    logThread("flatMap, Banana")
                    Observable.just("$it, 1 Item")
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ next ->
                    logThread("onNext")
                }, { error ->
                    logThread("onError")
                }, {
                    logThread("onComplete")
                })
        }

        btnEmitter.setOnClickListener {
            // TODO: simulate that onActivityResult is called
            emitter.onNext("Banana")
            emitter.onComplete()
        }
    }

    private fun logThread(operation: String) {
        Log.e("THREAD", "$operation run at [${Thread.currentThread().name}]")
    } }

当前Logcat

在 [RxCachedThreadScheduler-1]

中可观察 运行

flatMap,Banana 运行 在 [main]

onNext 运行 在 [main]

onComplete 运行 在 [main]

预计Logcat

在 [RxCachedThreadScheduler-1]

中可观察 运行

flatMap,Banana 运行 在 [RxCachedThreadScheduler-1]

onNext 运行 在 [main]

onComplete 运行 在 [main]

再添加一个 onserverOn(Schedulers.io()) 就可以了,告诉 Rx 切换线程回到工作线程

        obs
            .observeOn(Schedulers.io()) <------ additional 
            .flatMap {
                logThread("flatMap, Banana")
                Observable.just("$it, 1 Item")
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ next ->
                logThread("onNext")
            }, { error ->
                logThread("onError")
            }, {
                logThread("onComplete")
            })

因为主线程发出的数据,它导致 RX 在订阅主线程时从 subscribeOn() 注册的工作线程 (Schedulers.io) 切换整个下游。为了再次将整个下游的执行线程更改(切换)到工作线程 observeOn() 为此目的。

简而言之

  • observeOn,将下游更改为在特定线程上执行。

  • subscribeOn,将上游(根源)设置为在特定线程上执行。

现在Logcat

flatMap,Banana 运行 在 [RxCachedThreadScheduler-1]

onNext 运行 在 [main]

onComplete 运行 在 [main]