RxJava 调度器的用例

Use cases for RxJava schedulers

在RxJava中有5 different schedulers种可供选择:

  1. immediate(): Creates and returns a Scheduler that executes work immediately on the current thread.

  2. trampoline(): Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

  3. newThread(): Creates and returns a Scheduler that creates a new Thread for each unit of work.

  4. computation(): Creates and returns a Scheduler intended for computational work. This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler. Use Schedulers.io() instead.

  5. io(): Creates and returns a Scheduler intended for IO-bound work. The implementation is backed by an Executor thread-pool that will grow as needed. This can be used for asynchronously performing blocking IO. Do not perform computational work on this scheduler. Use Schedulers.computation() instead.

问题:

前 3 个调度程序很容易理解;但是,我对 computationio.

有点困惑
  1. "IO-bound work"到底是什么?它用于处理流(java.io)和文件(java.nio.files)吗?它用于数据库查询吗?是用来下载文件还是访问REST API?
  2. computation()newThread() 有何不同?是否所有 computation() 调用都在单个(后台)线程上,而不是每次都在一个新的(后台)线程上?
  3. 为什么在做 IO 工作时调用 computation() 不好?
  4. 为什么在进行计算工作时调用 io() 不好?

很好的问题,我认为文档可以提供更多细节。

  1. io() 由无界线程池支持,是用于非计算密集型任务的那种东西,不会给 CPU.所以是的,与文件系统的交互、与不同主机上的数据库或服务的交互都是很好的例子。
  2. computation() 由一个大小等于可用处理器数量的有界线程池支持。如果您尝试在多个可用处理器上并行安排 CPU 密集型工作(比如使用 newThread()),那么您将面临线程创建开销和上下文切换开销,因为线程会争夺处理器,并且它可能性能受到很大影响。
  3. 最好只将 computation() 留给 CPU 密集型工作,否则您将无法获得良好的 CPU 利用率。
  4. 由于 2 中讨论的原因,调用 io() 进行计算工作是不好的。io() 是无限制的,如果您在 io() 上并行安排一千个计算任务,那么每个那一千个任务每个都有自己的线程并竞争 CPU 产生上下文切换成本。

最重要的一点是 Schedulers.ioSchedulers.computation 都由无限线程池支持,因为反对问题中提到的其他人。此特性仅由 Schedulers.from(Executor)Executor 是使用 newCachedThreadPool[ 创建的情况下共享的=50=](不受自动回收线程池限制)。

正如之前的回复和网络上的多篇文章所详尽解释的那样,Schedulers.ioSchedulers.computation 应请谨慎使用,因为它们已针对其名称中的工作类型进行了优化。但是,在我看来,它们最重要的作用是 为反应流提供真正的并发性

与新手的看法相反,反应流不是天生并发的,而是天生异步和顺序的。出于这个原因,仅当 I/O 操作阻塞时才应使用 Schedulers.io(例如:使用诸如 Apache IOUtils 之类的阻塞命令FileUtils.readFileAsString(...)) 因此会冻结调用线程,直到操作完成。

使用诸如 Java AsynchronousFileChannel(...) 之类的异步方法不会在操作期间阻塞调用线程,因此没有必要使用单独的线程。事实上,Schedulers.io 线程并不适合异步操作,因为它们没有 运行 事件循环并且回调永远不会......打电话。

相同的逻辑适用于数据库访问或远程 API 调用。如果可以使用异步或反应式 API 进行调用,请不要使用 Schedulers.io

回到并发。您可能无法访问异步或反应式 API 来异步或并发执行 I/O 操作,因此您唯一的选择是在单独的线程上分派多个调用。唉,反应流在它们的末端是顺序的,但好消息是 flatMap() 运算符可以在它们的末端引入并发核心.

并发必须在流构造中构建,通常使用 flatMap() 运算符。这个强大的运算符可以配置为在内部为您的 flatMap() 嵌入式 Function 提供多线程上下文。该上下文由多线程调度程序提供,例如 Scheduler.ioScheduler.computation

在有关 RxJava2 Schedulers and Concurrency 的文章中找到更多详细信息,您可以在其中找到代码示例以及有关如何顺序和并发使用调度程序的详细说明。

希望这对您有所帮助,

软杰克

This blog post provides an excellent answer

来自博客 post:

Schedulers.io() 由无限线程池支持。它用于非 CPU 密集型 I/O 类型的工作,包括与文件系统交互、执行网络调用、数据库交互等。此线程池旨在用于异步执行阻塞 IO。

Schedulers.computation() 由一个大小不超过可用处理器数量的有界线程池支持。它用于计算或 CPU 密集型工作,例如调整图像大小、处理大型数据集等。注意:当您分配的计算线程多于可用内核时,由于上下文切换和线程创建开销,性能会下降作为线程争夺处理器的时间。

Schedulers.newThread() 为计划的每个工作单元创建一个新线程。这个调度程序很昂贵,因为每次都会产生新线程并且不会发生重用。

Schedulers.from(Executor executor) 创建并 returns 由指定执行器支持的自定义调度程序。要限制线程池中的并发线程数,请使用 Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果一个任务在所有线程都被占用时被调度,它就会被排队。池中的线程将一直存在,直到它被显式关闭。

主线程或AndroidSchedulers.mainThread()是由RxAndroid扩展库提供给RxJava的。主线程(也称为 UI 线程)是用户交互发生的地方。应注意不要使该线程超载,以防止卡顿无响应 UI 或更糟的是,“应用程序无响应”(ANR) 对话框。

Schedulers.single() 是 RxJava 2 中的新功能。此调度程序由按请求顺序依次执行任务的单个线程支持。

Schedulers.trampoline() 由其中一个参与的工作线程以 FIFO(先进先出)方式执行任务。它经常在实现递归时使用,以避免增加调用堆栈。