RxJava如何在Subscriber的其他线程中立即获取所有元素?

How to get all elements immediately in other threads in Subscriber in RxJava?

如果我执行这样的代码:

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    Flowable.fromIterable(Lists.newArrayList(1, 2, 3, 4, 5, 6))
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Throwable {
                    System.out.println(stopWatch + " value:" + integer);
                    Thread.sleep(1000);
                }
            });
    Thread.sleep(100000);

在输出中,我在睡眠时间后得到每个元素,如下所示:

00:00:00.027 value:1
00:00:01.030 value:2
00:00:02.030 value:3
00:00:03.031 value:4
00:00:04.031 value:5
00:00:05.031 value:6

但据我所知,如果我使用 Schedulers.io(),那么我必须并行获取所有值,而且我预计我会立即获取所有值,而不是等待 1000 磨机一次,仅此而已

像这样:

00:00:00.027 value:1
00:00:00.030 value:2
00:00:00.030 value:3
00:00:00.031 value:4
00:00:00.031 value:5
00:00:00.031 value:6

我怎样才能在其他线程中全部获取它们,而不是一个一个地获取?

我不想让他们互相等待

我尝试了Schedulers.computation()和其他的,但他们还是一个接一个地到达

如何立即全部获取?

P.S。 google 中有一些文字可以更好地搜索。我从浏览器历史记录中得到它。 rxjava 如何实现多个订阅者,rxjava 如何实现 post 所有元素同步,只有一个线程处于活动状态

if i use Schedulers.io(), than i must get all the values parallel

没有。 RxJava 流程默认是顺序的,这意味着项目一个接一个地交付。如果您的消费者阻塞或睡眠,后续项目每次都会延迟。

How can i get them all in other threads, NOT one by one?

使用parallel:

StopWatch stopWatch = new StopWatch();
stopWatch.start();
Flowable.fromIterable(Lists.newArrayList(1, 2, 3, 4, 5, 6))
        .parallel()
        .runOn(Schedulers.io())
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                System.out.println(stopWatch + " value:" + integer);
                Thread.sleep(1000);
            }
        })
        .sequential()
        .subscribe();

Thread.sleep(100000);

推荐阅读:https://github.com/ReactiveX/RxJava#parallel-processing