Android RxJava 和可观察对象的链接

Android RxJava and chaining of observables

我正在尝试将几个可观察对象链接在一起,并根据已执行的可观察对象执行一些操作。但是我遇到了奇怪的行为。

class MainActivity : AppCompatActivity() {

    val TAG: String = MainActivity::class.java.name

    private lateinit var clicker: TextView

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        clicker = findViewById(R.id.clicker) as TextView
        clicker.setOnClickListener {
            val i = AtomicInteger()

            getFirstObservable()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext {
                    showMessage(i, it)
                }
                .flatMap { getSecondObservable() }
                .doOnNext {
                    showMessage(i, it)
                }
                .flatMap { getThirdObservable() }
                .doOnNext {
                    showMessage(i, it)
                }
                .subscribe()
        }
    }

    fun getFirstObservable(): Observable<String> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            "Hello"
        }
    }

    fun getSecondObservable(): Observable<Int> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            3
        }
    }

    fun getThirdObservable(): Observable<String> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            "World!"
        }
    }

    fun showMessage(i: AtomicInteger, obj: Any) {
        val msg = "Message #${i.incrementAndGet()}: from ${Thread.currentThread().name}: $obj"
        Log.e(TAG, msg)
        clicker.text = msg
        Toast.makeText(this, msg, Toast.LENGTH_SHORT).show()
    }
}

在此示例中,日志将每 2 秒显示一次,但所有视图更改都将在最后一个可观察对象完成时完成。

12-04 01:11:30.465 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #1: from main: Hello
12-04 01:11:32.473 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #2: from main: 3
12-04 01:11:34.479 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #3: from main: World!

我认为这是 AndroidScheduler.mainThread() 的行为,因为当我删除这一行并用这样的视图包装更改时

Handler(Looper.getMainLooper()).post {
    clicker.text = msg
    Toast.makeText(this, msg, Toast.LENGTH_SHORT).show()
}

行为变得正确。那么有人可以解释这种行为并提出解决此问题的正确方法吗?

你的大部分代码都在主线程上执行,包括睡眠。当创建一个可观察对象时,除非另有说明,否则它会在当前线程上订阅和观察。当您创建第二个和第三个可观察对象时,它们在主线程上。此外,由于可观察工作没有后台,当您订阅时,它会立即在当前线程上执行。因此,所有的工作和观察都发生在主线程上,而不会返回到 android OS。 UI 在主线程上被阻塞等待时间。如果您增加这些睡眠时间,则可以强制执行 ANR。要修复它,您可以为每个可观察对象指定 observeOnsubscribeOn,以将工作推送到每个可观察对象的计算线程。

getFirstObservable().subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .flatMap {
                        getSecondObservable().subscribeOn(Schedulers.computation())
                                             .observeOn(AndroidSchedulers.mainThread()) 
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .flatMap { 
                        getThirdObservable().subscribeOn(Schedulers.computation())
                                            .observeOn(AndroidSchedulers.mainThread()) 
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .subscribe()