RxJava 在找到第一个结果时中断并行执行

RxJava interrupt parallel execution when first result found

我是 运行 一些并行延迟工作的服务。任务是不等待所有服务执行完毕。

    Observable.just(2, 3, 5)
                .map(delay -> serviceReturningSingleWithDelay(delay))
                .toList()
                .flatMap(list ->
                        Single.zip(list, output -> Arrays.stream(output)
                                .map(delay -> (Integer) delay)
                                .filter(delay -> delay == 3)
                                .findFirst()
                                .orElse(0)
                        ))
                .subscribe(System.out::println);
 private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
        return Single.just(delay)
                .delay(delay, TimeUnit.SECONDS)
                .doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n", delay, Thread.currentThread().getName()));
    }

现在我的输出是:

Delay 2: Thread : RxComputationThreadPool-1 
Delay 3: Thread : RxComputationThreadPool-2 
Delay 5: Thread : RxComputationThreadPool-3 
3

期望的结果是在 RxComputationThreadPool-3 线程完成执行之前获得过滤值 - 3。 如果有任何想法,我将不胜感激。

如果你想 运行 它们并行并在收到值 3 时退出,你不需要使用 zip。而是使用 takeWhile 来中断您的可观察对象,如下所示:

Observable.just(2, 3, 5)
          .flatMapSingle(this::serviceReturningSingleWithDelay)
          .takeWhile(e -> e != 3)
          .subscribe(System.out::println);

如果您想要 3 值,请使用 takeUntil(e -> e == 3) 而不是 takeWhile(e -> e != 3)

基于@bubbles 答案的另一种方式。如果你想保留 flatmap 而在 Kotlin 中

fun main(args: Array<String>) {
    Observable.just(5L, 8L, 10L)
            .flatMap {
                serviceReturningSingleWithDelay(it).toObservable()
            }
            .takeWhile { delay -> delay != 8L }
            .subscribeBy(
                    onNext = { println("Delay period $it")}
            )

    Thread.sleep(10000)
}

private fun serviceReturningSingleWithDelay(delay: Long): Single<Long> {
    return Single.just(delay)
            .delay(delay, TimeUnit.SECONDS)
            .doOnSuccess {
                println("Delay $it Thread ${Thread.currentThread().name}")
            }
}