如何使用Reactor框架2.x实现多线程map/reduce?
How to perform multi-threaded map/reduce using Reactor framework 2.x?
我之前问过 this question Reactor 1.x:
Let's say I have a Collection<Map>
. I want to:
Transform each Map
instance to an object of type Foo
concurrently (each instance is totally independent of another - there is no need to convert each serially/iteratively).
When all of them are converted, I want a a method, onReduce(Collection<Foo> foos)
, to be called - the argument contains all of the resulting Foo
instances.
但我们似乎找不到 Reactor 的等效解决方案 2.x - 只是单线程。
如何在 Reactor 2.x 中执行多线程 map/reduce?例如,您如何使用基于 ExecutorService 的 Dispatcher 执行此操作?
现在使用 Reactor 2.0 实际上非常容易。你可以这样做:
List<Map<String, Object>> data = readData(); // <1>
Streams.from(data)
.flatMap(m -> Streams.just(m)
.dispatchOn(Environment.cachedDispatcher()) // <2>
.map(ignored -> Thread.currentThread().getName()))
.buffer() // <3>
.consume(s -> System.out.println("s: " + s)); // <4>
- 根据输入数据创建
Stream
。
- 为每个
Map
创建一个新的 Stream
并在给定的 Dispatcher
上分派地图操作。
- 缓冲所有值直到完成,当收集清空时将发送到下游。
- Consume List,它是来自子流的负载平衡转换的结果。
我之前问过 this question Reactor 1.x:
Let's say I have a
Collection<Map>
. I want to:Transform each
Map
instance to an object of typeFoo
concurrently (each instance is totally independent of another - there is no need to convert each serially/iteratively).When all of them are converted, I want a a method,
onReduce(Collection<Foo> foos)
, to be called - the argument contains all of the resultingFoo
instances.
但我们似乎找不到 Reactor 的等效解决方案 2.x - 只是单线程。
如何在 Reactor 2.x 中执行多线程 map/reduce?例如,您如何使用基于 ExecutorService 的 Dispatcher 执行此操作?
现在使用 Reactor 2.0 实际上非常容易。你可以这样做:
List<Map<String, Object>> data = readData(); // <1>
Streams.from(data)
.flatMap(m -> Streams.just(m)
.dispatchOn(Environment.cachedDispatcher()) // <2>
.map(ignored -> Thread.currentThread().getName()))
.buffer() // <3>
.consume(s -> System.out.println("s: " + s)); // <4>
- 根据输入数据创建
Stream
。 - 为每个
Map
创建一个新的Stream
并在给定的Dispatcher
上分派地图操作。 - 缓冲所有值直到完成,当收集清空时将发送到下游。
- Consume List,它是来自子流的负载平衡转换的结果。