使用 Reactor 作为任务执行器的正确方法

Right way to use Reactor as a task executor

我正在研究 documentation 一种将 Reactor 用作任务执行器的方法。我想提交一些任务,将其并行化,然后等待它像 ThreadService 一样终止,示例如下:

final ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(() -> {sleep(200);System.out.println(1);});
pool.submit(() -> System.out.println(2));
pool.submit(() -> System.out.println(3));
pool.submit(() -> System.out.println(4));
pool.submit(() -> System.out.println(5));
pool.submit(() -> System.out.println(6));
pool.submit(() -> System.out.println(7));
pool.submit(() -> System.out.println(8));
pool.submit(() -> System.out.println(9));
pool.submit(() -> System.out.println(10));

pool.awaitTermination(5, TimeUnit.SECONDS);
pool.shutdown();

我只是在将结果收集到列表中时得到的

final List<Integer> result = Flux.range(1, 15)
    .parallel()
    .runOn(Schedulers.elastic())
    .map(n -> {
        // simulating heavy task
        sleep((long) (Math.random() * 100));
        return n;
    })
    .sequential()
    .collectList().block();
System.out.println(result);

有更好的方法吗?

首先请记住,您的阻塞 map 没问题 因为您 运行 它处于弹性 Scheduler 中(可能也是parallel())。

如果你不关心结果并想在终止时阻塞,你可以使用 .blockLast()(它会 return 最后的 n 计算,但你可以忽略那个。

如果你想在链上进一步组合,比如启动另一个进程,你可以链接任何类型的 then,这将 return 一个代表任务终止的 Mono (删除数据事件)。