如何使用Reactor框架2.x实现多线程map/reduce?

How to perform multi-threaded map/reduce using Reactor framework 2.x?

我之前问过 this question Reactor 1.x:

Let's say I have a Collection<Map>. I want to:

Transform each Map instance to an object of type Foo concurrently (each instance is totally independent of another - there is no need to convert each serially/iteratively).

When all of them are converted, I want a a method, onReduce(Collection<Foo> foos), to be called - the argument contains all of the resulting Foo instances.

但我们似乎找不到 Reactor 的等效解决方案 2.x - 只是单线程。

如何在 Reactor 2.x 中执行多线程 map/reduce?例如,您如何使用基于 ExecutorService 的 Dispatcher 执行此操作?

现在使用 Reactor 2.0 实际上非常容易。你可以这样做:

List<Map<String, Object>> data = readData(); // <1>

Streams.from(data)
       .flatMap(m -> Streams.just(m)
                            .dispatchOn(Environment.cachedDispatcher()) // <2>
                            .map(ignored -> Thread.currentThread().getName()))
       .buffer() // <3>
       .consume(s -> System.out.println("s: " + s)); // <4>
  1. 根据输入数据创建Stream
  2. 为每个 Map 创建一个新的 Stream 并在给定的 Dispatcher 上分派地图操作。
  3. 缓冲所有值直到完成,当收集清空时将发送到下游。
  4. Consume List,它是来自子流的负载平衡转换的结果。