RxJava 使用 Stream API 替代并行计算大阶乘

RxJava as replacement for parallel calculating big factorials with StreamAPI

我创建了一个函数来计算使用多个线程计算阶乘所需的时间。 它是使用 StreamAPI 创建的。

static Long streamParallelFactorial(int number) {

        long czasRozpoczecia = System.currentTimeMillis();
        IntStream.range(1, number).parallel().mapToObj(BigInteger::valueOf).reduce(BigInteger.ONE,
                BigInteger::multiply);
        long czasZakonczenia = System.currentTimeMillis();

        return czasZakonczenia - czasRozpoczecia;
    }

现在我正在尝试做同样的事情,但使用的是 RxJava。上班的时候,这个怪物出来了。

static Long rxJavaParallelFactorial(int number) {
    long czasRozpoczecia = System.currentTimeMillis();

    Observable<Integer> vals = Observable.range(1,number);
    vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .scan(BigInteger.ONE, (big,cur) ->
            big.multiply(BigInteger.valueOf(cur))));


    long czasZakonczenia = System.currentTimeMillis();
    return czasZakonczenia - czasRozpoczecia;
}

有人会很友善并建议我应该怎么做才能让它发挥作用? 提前谢谢你。

你可以在 RxJava 2+ 中进行类似于 Stream 的并行计算:

Flowable.range(1, number)
.parallel()
.runOn(Schedulers.computation())
.map(BigInteger:valueOf)
.reduce(() -> BigInteger.ONE, BigInteger::multiply)
.reduce(BigInteger::multiply)
.subscribe();