Rxjava Scheduler.trampoline 与 concatmap
Rxjava Scheduler.trampoline versus concatmap
根据文档,Scheduler.trampoline 似乎可以确保元素发出先进先出(即按顺序)。似乎 concat map 的要点是确保所有内容都正确排列然后发出。所以我想知道在应用 subscribeOn./.observeOn(Scheduler.trampoline()) 之后是否有任何意义,然后再执行 concatmap 运算符而不是常规映射操作。
是的,有一点。举个例子:
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.trampoline())
.flatMap(
a -> {
if (a < 3) {
return Observable.just(a).delay(3, TimeUnit.SECONDS);
} else {
return Observable.just(a);
}
})
.doOnNext(
a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
.subscribe();
这是输出:
Element: 3, on: main
Element: 4, on: main
Element: 5, on: main
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
这里发生的是 1 和 2 依次到达 flatMap
运算符。但是现在,这些元素的内部流延迟了 3 秒。请注意 flatMap
热切地 subscribes
到内部流。也就是说,在 subscribing
到下一个内部流(这就是 concatMap 所做的)之前,它不会等待一个流完成(使用 onComplete
)。
所以1和2的内部流延迟了3秒。您可以说这是一个需要一些时间的外部 I/O 调用。同时,接下来的 3 个元素 (3,4,5) 进入 flatMap
并且它们的流立即结束。这就是您看到输出中保留序列的原因。
然后 3 秒结束,元素 1 和 2 被发射。请注意,不能保证 1 会先于 2。
现在将 flatMap
替换为 concatMap
,您会看到序列保持不变:
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
Element: 3, on: RxComputationScheduler-2
Element: 4, on: RxComputationScheduler-2
Element: 5, on: RxComputationScheduler-2
为什么?因为这就是 concatMap
的工作原理。元素 1 出现,并在 I/O 调用中使用。在与其内部流对应的内部流发出 onComplete
之前需要 3 秒。在第一个流发出 onComplete 之前,concatMap
不会订阅对应于剩余元素的内部流。一旦这样做,下一个流 (Observable.just(2).delay(3, TimeUnit.SECONDS)
) 就是 subscribed
,依此类推。所以你可以看到订单是如何维护的。
关于这两个运算符,您需要记住的是:flatMap
急切地 subscribes
到内部流,当元素到达时。另一方面,concatMap
在它 subscribes
到下一个之前等待一个流完成。这就是为什么您不能使用 concatMap
.
进行并行调用的原因
不是真的。 trampoline
本质上是在其中一个线程上执行工作,以 FIFO 顺序调用其 Worker.schedule
方法。
在Observable.subscribeOn(Schedulers.trampoline())
的情况下,它将是线程订阅,因此应用它没有实际效果。
在Observable.observeOn(Schedulers.trampoline())
的情况下,它将是线程信号项,因此那里也没有实际作用。
concatMap
在发送上游项信号的线程或内部 Observable
完成的线程上执行映射器函数。操作员本质上已经有一个内置的蹦床,因此上游项目和下游完成不会重叠。在3.x中,会有一个overload取一个Scheduler
,Schedulers.trampoline()
也没有实际效果。
Schedulers.trampoline()
的最佳用例是在不需要异步的单元测试中。因此,您要么参数化您的 subscribeOn
/observeOn
用法,要么使用调度程序挂钩并替换标准调度程序:
RxJavaPlugins.setComputationSchedulerHandler(s -> Schedulers.trampoline());
RxJavaPlugins.setIoSchedulerHandler(s -> Schedulers.trampoline());
RxJavaPlugins.setNewThreadSchedulerHandler(s -> Schedulers.trampoline());
完成后,
RxJavaPlugins.reset();
根据文档,Scheduler.trampoline 似乎可以确保元素发出先进先出(即按顺序)。似乎 concat map 的要点是确保所有内容都正确排列然后发出。所以我想知道在应用 subscribeOn./.observeOn(Scheduler.trampoline()) 之后是否有任何意义,然后再执行 concatmap 运算符而不是常规映射操作。
是的,有一点。举个例子:
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.trampoline())
.flatMap(
a -> {
if (a < 3) {
return Observable.just(a).delay(3, TimeUnit.SECONDS);
} else {
return Observable.just(a);
}
})
.doOnNext(
a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
.subscribe();
这是输出:
Element: 3, on: main
Element: 4, on: main
Element: 5, on: main
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
这里发生的是 1 和 2 依次到达 flatMap
运算符。但是现在,这些元素的内部流延迟了 3 秒。请注意 flatMap
热切地 subscribes
到内部流。也就是说,在 subscribing
到下一个内部流(这就是 concatMap 所做的)之前,它不会等待一个流完成(使用 onComplete
)。
所以1和2的内部流延迟了3秒。您可以说这是一个需要一些时间的外部 I/O 调用。同时,接下来的 3 个元素 (3,4,5) 进入 flatMap
并且它们的流立即结束。这就是您看到输出中保留序列的原因。
然后 3 秒结束,元素 1 和 2 被发射。请注意,不能保证 1 会先于 2。
现在将 flatMap
替换为 concatMap
,您会看到序列保持不变:
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
Element: 3, on: RxComputationScheduler-2
Element: 4, on: RxComputationScheduler-2
Element: 5, on: RxComputationScheduler-2
为什么?因为这就是 concatMap
的工作原理。元素 1 出现,并在 I/O 调用中使用。在与其内部流对应的内部流发出 onComplete
之前需要 3 秒。在第一个流发出 onComplete 之前,concatMap
不会订阅对应于剩余元素的内部流。一旦这样做,下一个流 (Observable.just(2).delay(3, TimeUnit.SECONDS)
) 就是 subscribed
,依此类推。所以你可以看到订单是如何维护的。
关于这两个运算符,您需要记住的是:flatMap
急切地 subscribes
到内部流,当元素到达时。另一方面,concatMap
在它 subscribes
到下一个之前等待一个流完成。这就是为什么您不能使用 concatMap
.
不是真的。 trampoline
本质上是在其中一个线程上执行工作,以 FIFO 顺序调用其 Worker.schedule
方法。
在Observable.subscribeOn(Schedulers.trampoline())
的情况下,它将是线程订阅,因此应用它没有实际效果。
在Observable.observeOn(Schedulers.trampoline())
的情况下,它将是线程信号项,因此那里也没有实际作用。
concatMap
在发送上游项信号的线程或内部 Observable
完成的线程上执行映射器函数。操作员本质上已经有一个内置的蹦床,因此上游项目和下游完成不会重叠。在3.x中,会有一个overload取一个Scheduler
,Schedulers.trampoline()
也没有实际效果。
Schedulers.trampoline()
的最佳用例是在不需要异步的单元测试中。因此,您要么参数化您的 subscribeOn
/observeOn
用法,要么使用调度程序挂钩并替换标准调度程序:
RxJavaPlugins.setComputationSchedulerHandler(s -> Schedulers.trampoline());
RxJavaPlugins.setIoSchedulerHandler(s -> Schedulers.trampoline());
RxJavaPlugins.setNewThreadSchedulerHandler(s -> Schedulers.trampoline());
完成后,
RxJavaPlugins.reset();