使用 ForkJoinPool.commonPool() 代替 RxJava 的计算调度器
Using ForkJoinPool.commonPool() in place of RxJava's computation scheduler
我目前正在为个人项目实施 RxJava2 的 API 的一小部分。我有一个基于侦听器的 API,我开始编写代码来包装我的侦听器并实现 Flowable.create()
:
public class EventBus {
private final Flowable<Event> events;
public EventBus(MyApi api) {
this.events = Flowable.create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event event) {
emitter.onNext(event);
}
};
api.addListener(listener);
// TODO api.removeListener(listener)
}, BackpressureStrategy.BUFFER);
}
}.
我 运行 进行了快速测试,它工作得很好,但我意识到它是 单线程 。没什么大不了的:实际上 RxJava 的设计是单线程的,除非指定 Scheduler
.
根据 RxJava2 的文档,我决定链接一个 Flowable.subscribeOn()
调用,我将使用 Scheduler.computation()
参数调用它。
所以我继续实施 Flowable.subscribeOn()
和 Scheduler.computation()
,这就是我试图解决的问题:我在不同的地方看到 Java的ForkJoinPool.commonPool()
被推荐到运行计算任务上,但是RxJava2不使用它。我的主要问题是:
- 这是否适合我非常基本的响应式实施?
- 为什么 RxJava2 选择实现自己的池而不是使用这个池?
- 为了让我的生活更轻松,我应该注意这种方法有什么问题吗?
谢谢!
Would this be a good fit for my very basic reactive implementation?
为传统的回调 api 编写适配器有点常见,假设还没有编写的话。
Why did RxJava2 choose to implement its own pool instead of using this one?
RxJava 始终以 Java 6 及更高版本为目标,而 ForkJoinPool
是 Java 7。请注意,commonPool
有时会在调用者线程上执行工作,并且不会异步。由于 same-pool 死锁,我们有许多单元测试挂在 CI 服务器上。
implementing a small subset of RxJava2's API
Are there any problems with this approach that I should be aware of to make my life easier down the road?
你这是什么意思?您是在尝试重新实现 RxJava 2 库,还是在您的代码中 使用 和 应用 现有运算符?
如果是后者,除了 commonPool
的警告之外,这取决于您尝试与之交互的内容。
我目前正在为个人项目实施 RxJava2 的 API 的一小部分。我有一个基于侦听器的 API,我开始编写代码来包装我的侦听器并实现 Flowable.create()
:
public class EventBus {
private final Flowable<Event> events;
public EventBus(MyApi api) {
this.events = Flowable.create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event event) {
emitter.onNext(event);
}
};
api.addListener(listener);
// TODO api.removeListener(listener)
}, BackpressureStrategy.BUFFER);
}
}.
我 运行 进行了快速测试,它工作得很好,但我意识到它是 单线程 。没什么大不了的:实际上 RxJava 的设计是单线程的,除非指定 Scheduler
.
根据 RxJava2 的文档,我决定链接一个 Flowable.subscribeOn()
调用,我将使用 Scheduler.computation()
参数调用它。
所以我继续实施 Flowable.subscribeOn()
和 Scheduler.computation()
,这就是我试图解决的问题:我在不同的地方看到 Java的ForkJoinPool.commonPool()
被推荐到运行计算任务上,但是RxJava2不使用它。我的主要问题是:
- 这是否适合我非常基本的响应式实施?
- 为什么 RxJava2 选择实现自己的池而不是使用这个池?
- 为了让我的生活更轻松,我应该注意这种方法有什么问题吗?
谢谢!
Would this be a good fit for my very basic reactive implementation?
为传统的回调 api 编写适配器有点常见,假设还没有编写的话。
Why did RxJava2 choose to implement its own pool instead of using this one?
RxJava 始终以 Java 6 及更高版本为目标,而 ForkJoinPool
是 Java 7。请注意,commonPool
有时会在调用者线程上执行工作,并且不会异步。由于 same-pool 死锁,我们有许多单元测试挂在 CI 服务器上。
implementing a small subset of RxJava2's API Are there any problems with this approach that I should be aware of to make my life easier down the road?
你这是什么意思?您是在尝试重新实现 RxJava 2 库,还是在您的代码中 使用 和 应用 现有运算符?
如果是后者,除了 commonPool
的警告之外,这取决于您尝试与之交互的内容。