使用 ForkJoinPool.commonPool() 代替 RxJava 的计算调度器

Using ForkJoinPool.commonPool() in place of RxJava's computation scheduler

我目前正在为个人项目实施 RxJava2 的 API 的一小部分。我有一个基于侦听器的 API,我开始编写代码来包装我的侦听器并实现 Flowable.create():

public class EventBus {
    private final Flowable<Event> events;

    public EventBus(MyApi api) {
        this.events = Flowable.create(emitter -> {
            Callback listener = new Callback() {
                @Override
                public void onEvent(Event event) {
                    emitter.onNext(event);
                }
            };
            api.addListener(listener);
            // TODO api.removeListener(listener)
        }, BackpressureStrategy.BUFFER);
    }
}.

我 运行 进行了快速测试,它工作得很好,但我意识到它是 单线程 。没什么大不了的:实际上 RxJava 的设计是单线程的,除非指定 Scheduler.

根据 RxJava2 的文档,我决定链接一个 Flowable.subscribeOn() 调用,我将使用 Scheduler.computation() 参数调用它。

所以我继续实施 Flowable.subscribeOn()Scheduler.computation(),这就是我试图解决的问题:我在不同的地方看到 Java的ForkJoinPool.commonPool()被推荐到运行计算任务上,但是RxJava2不使用它。我的主要问题是:

谢谢!

Would this be a good fit for my very basic reactive implementation?

为传统的回调 api 编写适配器有点常见,假设还没有编写的话。

Why did RxJava2 choose to implement its own pool instead of using this one?

RxJava 始终以 Java 6 及更高版本为目标,而 ForkJoinPool 是 Java 7。请注意,commonPool 有时会在调用者线程上执行工作,并且不会异步。由于 same-pool 死锁,我们有许多单元测试挂在 CI 服务器上。

implementing a small subset of RxJava2's API Are there any problems with this approach that I should be aware of to make my life easier down the road?

你这是什么意思?您是在尝试重新实现 RxJava 2 库,还是在您的代码中 使用 应用 现有运算符?

如果是后者,除了 commonPool 的警告之外,这取决于您尝试与之交互的内容。