RxJava 在找到第一个结果时中断并行执行
RxJava interrupt parallel execution when first result found
我是 运行 一些并行延迟工作的服务。任务是不等待所有服务执行完毕。
Observable.just(2, 3, 5)
.map(delay -> serviceReturningSingleWithDelay(delay))
.toList()
.flatMap(list ->
Single.zip(list, output -> Arrays.stream(output)
.map(delay -> (Integer) delay)
.filter(delay -> delay == 3)
.findFirst()
.orElse(0)
))
.subscribe(System.out::println);
private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n", delay, Thread.currentThread().getName()));
}
现在我的输出是:
Delay 2: Thread : RxComputationThreadPool-1
Delay 3: Thread : RxComputationThreadPool-2
Delay 5: Thread : RxComputationThreadPool-3
3
期望的结果是在 RxComputationThreadPool-3 线程完成执行之前获得过滤值 - 3。
如果有任何想法,我将不胜感激。
如果你想 运行 它们并行并在收到值 3 时退出,你不需要使用 zip
。而是使用 takeWhile
来中断您的可观察对象,如下所示:
Observable.just(2, 3, 5)
.flatMapSingle(this::serviceReturningSingleWithDelay)
.takeWhile(e -> e != 3)
.subscribe(System.out::println);
如果您想要 3
值,请使用 takeUntil(e -> e == 3)
而不是 takeWhile(e -> e != 3)
基于@bubbles 答案的另一种方式。如果你想保留 flatmap
而在 Kotlin 中
fun main(args: Array<String>) {
Observable.just(5L, 8L, 10L)
.flatMap {
serviceReturningSingleWithDelay(it).toObservable()
}
.takeWhile { delay -> delay != 8L }
.subscribeBy(
onNext = { println("Delay period $it")}
)
Thread.sleep(10000)
}
private fun serviceReturningSingleWithDelay(delay: Long): Single<Long> {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess {
println("Delay $it Thread ${Thread.currentThread().name}")
}
}
我是 运行 一些并行延迟工作的服务。任务是不等待所有服务执行完毕。
Observable.just(2, 3, 5)
.map(delay -> serviceReturningSingleWithDelay(delay))
.toList()
.flatMap(list ->
Single.zip(list, output -> Arrays.stream(output)
.map(delay -> (Integer) delay)
.filter(delay -> delay == 3)
.findFirst()
.orElse(0)
))
.subscribe(System.out::println);
private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n", delay, Thread.currentThread().getName()));
}
现在我的输出是:
Delay 2: Thread : RxComputationThreadPool-1
Delay 3: Thread : RxComputationThreadPool-2
Delay 5: Thread : RxComputationThreadPool-3
3
期望的结果是在 RxComputationThreadPool-3 线程完成执行之前获得过滤值 - 3。 如果有任何想法,我将不胜感激。
如果你想 运行 它们并行并在收到值 3 时退出,你不需要使用 zip
。而是使用 takeWhile
来中断您的可观察对象,如下所示:
Observable.just(2, 3, 5)
.flatMapSingle(this::serviceReturningSingleWithDelay)
.takeWhile(e -> e != 3)
.subscribe(System.out::println);
如果您想要 3
值,请使用 takeUntil(e -> e == 3)
而不是 takeWhile(e -> e != 3)
基于@bubbles 答案的另一种方式。如果你想保留 flatmap
而在 Kotlin 中
fun main(args: Array<String>) {
Observable.just(5L, 8L, 10L)
.flatMap {
serviceReturningSingleWithDelay(it).toObservable()
}
.takeWhile { delay -> delay != 8L }
.subscribeBy(
onNext = { println("Delay period $it")}
)
Thread.sleep(10000)
}
private fun serviceReturningSingleWithDelay(delay: Long): Single<Long> {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess {
println("Delay $it Thread ${Thread.currentThread().name}")
}
}