RxJava 中的间隔运算符:自从我订阅了 IO 线程后,哪部分代码在后台线程中运行

Interval Operator in RxJava : Which part of code runs in background thread since I have subscribed on IO thread

代码:

class IntervalOperatorFragment : Fragment() {

    private val TAG = IntervalOperatorFragment::class.java.simpleName
    private var _binding: FragmentOperatorIntervalBinding? = null
    private val binding get() = _binding!!

    private val INTERVAL_PERIOD = 1L
    private val MAXIMUM_PERIOD = 5L

    override fun onCreateView(
        inflater: LayoutInflater,
        container: ViewGroup?,
        savedInstanceState: Bundle?
    ): View? {
        _binding = FragmentOperatorIntervalBinding.inflate(inflater, container, false)
        return binding.root
    }

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        onClickListeners()
    }

    override fun onDestroyView() {
        super.onDestroyView()
        _binding = null
    }

    private fun onClickListeners() {
        binding.floatingActionButton.setOnClickListener {
            subscribeToObservable()
        }
    }

    /**
     * Create the observable
     */
    private fun createObservable(): Observable<Long> {
        return Observable.interval(INTERVAL_PERIOD, TimeUnit.SECONDS)
                         .subscribeOn(Schedulers.io())
                         .observeOn(AndroidSchedulers.mainThread())
                         .takeWhile { value ->
                             Timber.tag(TAG).d("Thread: %s",Thread.currentThread())
                             value <= MAXIMUM_PERIOD
                         }
    }

    /**
     * Subscribe to the observable
     */
    private fun subscribeToObservable() {
        createObservable().subscribe(object : Observer<Long>{
            override fun onSubscribe(d: Disposable) {
                Timber.tag(TAG).d("Subscribe Invoked")
            }

            override fun onNext(t: Long) {
                Timber.tag(TAG).d("Value: %s", t)
            }

            override fun onError(e: Throwable) {
                Timber.tag(TAG).e("ERROR: %s",e.message)
            }

            override fun onComplete() {
                Timber.tag(TAG).d("Task is complete")
            }

        })
    }
}

输出:

2020-09-13 03:20:19.334 24452-24452/com.demo.code D/IntervalOperatorFragment: Subscribe Invoked
2020-09-13 03:20:20.345 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:20.348 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 0
2020-09-13 03:20:21.341 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:21.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 1
2020-09-13 03:20:22.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:22.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 2
2020-09-13 03:20:23.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:23.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 3
2020-09-13 03:20:24.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:24.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 4
2020-09-13 03:20:25.341 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:25.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 5
2020-09-13 03:20:26.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:26.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Task is complete

来自 Rx 文档:http://reactivex.io/documentation/operators/subscribeon.html

the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears.

如果你把 takeWhile 放在 observeOn 之前,那么它将使用 IO scheduler

Observable.interval(INTERVAL_PERIOD, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .takeWhile { value ->
                Timber.tag(TAG).d("Thread: %s",Thread.currentThread())
                value <= MAXIMUM_PERIOD
            }
        .observeOn(AndroidSchedulers.mainThread())

observeOn 下面的任何内容都将使用 observeOn 运算符中提到的调度程序。